Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * worker.c
3 : * PostgreSQL logical replication worker (apply)
4 : *
5 : * Copyright (c) 2016-2025, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/worker.c
9 : *
10 : * NOTES
11 : * This file contains the worker which applies logical changes as they come
12 : * from remote logical replication stream.
13 : *
14 : * The main worker (apply) is started by logical replication worker
15 : * launcher for every enabled subscription in a database. It uses
16 : * walsender protocol to communicate with publisher.
17 : *
18 : * This module includes server facing code and shares libpqwalreceiver
19 : * module with walreceiver for providing the libpq specific functionality.
20 : *
21 : *
22 : * STREAMED TRANSACTIONS
23 : * ---------------------
24 : * Streamed transactions (large transactions exceeding a memory limit on the
25 : * upstream) are applied using one of two approaches:
26 : *
27 : * 1) Write to temporary files and apply when the final commit arrives
28 : *
29 : * This approach is used when the user has set the subscription's streaming
30 : * option as on.
31 : *
32 : * Unlike the regular (non-streamed) case, handling streamed transactions has
33 : * to handle aborts of both the toplevel transaction and subtransactions. This
34 : * is achieved by tracking offsets for subtransactions, which is then used
35 : * to truncate the file with serialized changes.
36 : *
37 : * The files are placed in tmp file directory by default, and the filenames
38 : * include both the XID of the toplevel transaction and OID of the
39 : * subscription. This is necessary so that different workers processing a
40 : * remote transaction with the same XID doesn't interfere.
41 : *
42 : * We use BufFiles instead of using normal temporary files because (a) the
43 : * BufFile infrastructure supports temporary files that exceed the OS file size
44 : * limit, (b) provides a way for automatic clean up on the error and (c) provides
45 : * a way to survive these files across local transactions and allow to open and
46 : * close at stream start and close. We decided to use FileSet
47 : * infrastructure as without that it deletes the files on the closure of the
48 : * file and if we decide to keep stream files open across the start/stop stream
49 : * then it will consume a lot of memory (more than 8K for each BufFile and
50 : * there could be multiple such BufFiles as the subscriber could receive
51 : * multiple start/stop streams for different transactions before getting the
52 : * commit). Moreover, if we don't use FileSet then we also need to invent
53 : * a new way to pass filenames to BufFile APIs so that we are allowed to open
54 : * the file we desired across multiple stream-open calls for the same
55 : * transaction.
56 : *
57 : * 2) Parallel apply workers.
58 : *
59 : * This approach is used when the user has set the subscription's streaming
60 : * option as parallel. See logical/applyparallelworker.c for information about
61 : * this approach.
62 : *
63 : * TWO_PHASE TRANSACTIONS
64 : * ----------------------
65 : * Two phase transactions are replayed at prepare and then committed or
66 : * rolled back at commit prepared and rollback prepared respectively. It is
67 : * possible to have a prepared transaction that arrives at the apply worker
68 : * when the tablesync is busy doing the initial copy. In this case, the apply
69 : * worker skips all the prepared operations [e.g. inserts] while the tablesync
70 : * is still busy (see the condition of should_apply_changes_for_rel). The
71 : * tablesync worker might not get such a prepared transaction because say it
72 : * was prior to the initial consistent point but might have got some later
73 : * commits. Now, the tablesync worker will exit without doing anything for the
74 : * prepared transaction skipped by the apply worker as the sync location for it
75 : * will be already ahead of the apply worker's current location. This would lead
76 : * to an "empty prepare", because later when the apply worker does the commit
77 : * prepare, there is nothing in it (the inserts were skipped earlier).
78 : *
79 : * To avoid this, and similar prepare confusions the subscription's two_phase
80 : * commit is enabled only after the initial sync is over. The two_phase option
81 : * has been implemented as a tri-state with values DISABLED, PENDING, and
82 : * ENABLED.
83 : *
84 : * Even if the user specifies they want a subscription with two_phase = on,
85 : * internally it will start with a tri-state of PENDING which only becomes
86 : * ENABLED after all tablesync initializations are completed - i.e. when all
87 : * tablesync workers have reached their READY state. In other words, the value
88 : * PENDING is only a temporary state for subscription start-up.
89 : *
90 : * Until the two_phase is properly available (ENABLED) the subscription will
91 : * behave as if two_phase = off. When the apply worker detects that all
92 : * tablesyncs have become READY (while the tri-state was PENDING) it will
93 : * restart the apply worker process. This happens in
94 : * ProcessSyncingTablesForApply.
95 : *
96 : * When the (re-started) apply worker finds that all tablesyncs are READY for a
97 : * two_phase tri-state of PENDING it start streaming messages with the
98 : * two_phase option which in turn enables the decoding of two-phase commits at
99 : * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
100 : * Now, it is possible that during the time we have not enabled two_phase, the
101 : * publisher (replication server) would have skipped some prepares but we
102 : * ensure that such prepares are sent along with commit prepare, see
103 : * ReorderBufferFinishPrepared.
104 : *
105 : * If the subscription has no tables then a two_phase tri-state PENDING is
106 : * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
107 : * PUBLICATION which might otherwise be disallowed (see below).
108 : *
109 : * If ever a user needs to be aware of the tri-state value, they can fetch it
110 : * from the pg_subscription catalog (see column subtwophasestate).
111 : *
112 : * Finally, to avoid problems mentioned in previous paragraphs from any
113 : * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
114 : * to 'off' and then again back to 'on') there is a restriction for
115 : * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
116 : * the two_phase tri-state is ENABLED, except when copy_data = false.
117 : *
118 : * We can get prepare of the same GID more than once for the genuine cases
119 : * where we have defined multiple subscriptions for publications on the same
120 : * server and prepared transaction has operations on tables subscribed to those
121 : * subscriptions. For such cases, if we use the GID sent by publisher one of
122 : * the prepares will be successful and others will fail, in which case the
123 : * server will send them again. Now, this can lead to a deadlock if user has
124 : * set synchronous_standby_names for all the subscriptions on subscriber. To
125 : * avoid such deadlocks, we generate a unique GID (consisting of the
126 : * subscription oid and the xid of the prepared transaction) for each prepare
127 : * transaction on the subscriber.
128 : *
129 : * FAILOVER
130 : * ----------------------
131 : * The logical slot on the primary can be synced to the standby by specifying
132 : * failover = true when creating the subscription. Enabling failover allows us
133 : * to smoothly transition to the promoted standby, ensuring that we can
134 : * subscribe to the new primary without losing any data.
135 : *
136 : * RETAIN DEAD TUPLES
137 : * ----------------------
138 : * Each apply worker that enabled retain_dead_tuples option maintains a
139 : * non-removable transaction ID (oldest_nonremovable_xid) in shared memory to
140 : * prevent dead rows from being removed prematurely when the apply worker still
141 : * needs them to detect update_deleted conflicts. Additionally, this helps to
142 : * retain the required commit_ts module information, which further helps to
143 : * detect update_origin_differs and delete_origin_differs conflicts reliably, as
144 : * otherwise, vacuum freeze could remove the required information.
145 : *
146 : * The logical replication launcher manages an internal replication slot named
147 : * "pg_conflict_detection". It asynchronously aggregates the non-removable
148 : * transaction ID from all apply workers to determine the appropriate xmin for
149 : * the slot, thereby retaining necessary tuples.
150 : *
151 : * The non-removable transaction ID in the apply worker is advanced to the
152 : * oldest running transaction ID once all concurrent transactions on the
153 : * publisher have been applied and flushed locally. The process involves:
154 : *
155 : * - RDT_GET_CANDIDATE_XID:
156 : * Call GetOldestActiveTransactionId() to take oldestRunningXid as the
157 : * candidate xid.
158 : *
159 : * - RDT_REQUEST_PUBLISHER_STATUS:
160 : * Send a message to the walsender requesting the publisher status, which
161 : * includes the latest WAL write position and information about transactions
162 : * that are in the commit phase.
163 : *
164 : * - RDT_WAIT_FOR_PUBLISHER_STATUS:
165 : * Wait for the status from the walsender. After receiving the first status,
166 : * do not proceed if there are concurrent remote transactions that are still
167 : * in the commit phase. These transactions might have been assigned an
168 : * earlier commit timestamp but have not yet written the commit WAL record.
169 : * Continue to request the publisher status (RDT_REQUEST_PUBLISHER_STATUS)
170 : * until all these transactions have completed.
171 : *
172 : * - RDT_WAIT_FOR_LOCAL_FLUSH:
173 : * Advance the non-removable transaction ID if the current flush location has
174 : * reached or surpassed the last received WAL position.
175 : *
176 : * - RDT_STOP_CONFLICT_INFO_RETENTION:
177 : * This phase is required only when max_retention_duration is defined. We
178 : * enter this phase if the wait time in either the
179 : * RDT_WAIT_FOR_PUBLISHER_STATUS or RDT_WAIT_FOR_LOCAL_FLUSH phase exceeds
180 : * configured max_retention_duration. In this phase,
181 : * pg_subscription.subretentionactive is updated to false within a new
182 : * transaction, and oldest_nonremovable_xid is set to InvalidTransactionId.
183 : *
184 : * - RDT_RESUME_CONFLICT_INFO_RETENTION:
185 : * This phase is required only when max_retention_duration is defined. We
186 : * enter this phase if the retention was previously stopped, and the time
187 : * required to advance the non-removable transaction ID in the
188 : * RDT_WAIT_FOR_LOCAL_FLUSH phase has decreased to within acceptable limits
189 : * (or if max_retention_duration is set to 0). During this phase,
190 : * pg_subscription.subretentionactive is updated to true within a new
191 : * transaction, and the worker will be restarted.
192 : *
193 : * The overall state progression is: GET_CANDIDATE_XID ->
194 : * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
195 : * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
196 : * WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID.
197 : *
198 : * Retaining the dead tuples for this period is sufficient for ensuring
199 : * eventual consistency using last-update-wins strategy, as dead tuples are
200 : * useful for detecting conflicts only during the application of concurrent
201 : * transactions from remote nodes. After applying and flushing all remote
202 : * transactions that occurred concurrently with the tuple DELETE, any
203 : * subsequent UPDATE from a remote node should have a later timestamp. In such
204 : * cases, it is acceptable to detect an update_missing scenario and convert the
205 : * UPDATE to an INSERT when applying it. But, for concurrent remote
206 : * transactions with earlier timestamps than the DELETE, detecting
207 : * update_deleted is necessary, as the UPDATEs in remote transactions should be
208 : * ignored if their timestamp is earlier than that of the dead tuples.
209 : *
210 : * Note that advancing the non-removable transaction ID is not supported if the
211 : * publisher is also a physical standby. This is because the logical walsender
212 : * on the standby can only get the WAL replay position but there may be more
213 : * WALs that are being replicated from the primary and those WALs could have
214 : * earlier commit timestamp.
215 : *
216 : * Similarly, when the publisher has subscribed to another publisher,
217 : * information necessary for conflict detection cannot be retained for
218 : * changes from origins other than the publisher. This is because publisher
219 : * lacks the information on concurrent transactions of other publishers to
220 : * which it subscribes. As the information on concurrent transactions is
221 : * unavailable beyond subscriber's immediate publishers, the non-removable
222 : * transaction ID might be advanced prematurely before changes from other
223 : * origins have been fully applied.
224 : *
225 : * XXX Retaining information for changes from other origins might be possible
226 : * by requesting the subscription on that origin to enable retain_dead_tuples
227 : * and fetching the conflict detection slot.xmin along with the publisher's
228 : * status. In the RDT_WAIT_FOR_PUBLISHER_STATUS phase, the apply worker could
229 : * wait for the remote slot's xmin to reach the oldest active transaction ID,
230 : * ensuring that all transactions from other origins have been applied on the
231 : * publisher, thereby getting the latest WAL position that includes all
232 : * concurrent changes. However, this approach may impact performance, so it
233 : * might not worth the effort.
234 : *
235 : * XXX It seems feasible to get the latest commit's WAL location from the
236 : * publisher and wait till that is applied. However, we can't do that
237 : * because commit timestamps can regress as a commit with a later LSN is not
238 : * guaranteed to have a later timestamp than those with earlier LSNs. Having
239 : * said that, even if that is possible, it won't improve performance much as
240 : * the apply always lag and moves slowly as compared with the transactions
241 : * on the publisher.
242 : *-------------------------------------------------------------------------
243 : */
244 :
245 : #include "postgres.h"
246 :
247 : #include <sys/stat.h>
248 : #include <unistd.h>
249 :
250 : #include "access/commit_ts.h"
251 : #include "access/table.h"
252 : #include "access/tableam.h"
253 : #include "access/twophase.h"
254 : #include "access/xact.h"
255 : #include "catalog/indexing.h"
256 : #include "catalog/pg_inherits.h"
257 : #include "catalog/pg_subscription.h"
258 : #include "catalog/pg_subscription_rel.h"
259 : #include "commands/subscriptioncmds.h"
260 : #include "commands/tablecmds.h"
261 : #include "commands/trigger.h"
262 : #include "executor/executor.h"
263 : #include "executor/execPartition.h"
264 : #include "libpq/pqformat.h"
265 : #include "miscadmin.h"
266 : #include "optimizer/optimizer.h"
267 : #include "parser/parse_relation.h"
268 : #include "pgstat.h"
269 : #include "postmaster/bgworker.h"
270 : #include "postmaster/interrupt.h"
271 : #include "postmaster/walwriter.h"
272 : #include "replication/conflict.h"
273 : #include "replication/logicallauncher.h"
274 : #include "replication/logicalproto.h"
275 : #include "replication/logicalrelation.h"
276 : #include "replication/logicalworker.h"
277 : #include "replication/origin.h"
278 : #include "replication/slot.h"
279 : #include "replication/walreceiver.h"
280 : #include "replication/worker_internal.h"
281 : #include "rewrite/rewriteHandler.h"
282 : #include "storage/buffile.h"
283 : #include "storage/ipc.h"
284 : #include "storage/lmgr.h"
285 : #include "storage/procarray.h"
286 : #include "tcop/tcopprot.h"
287 : #include "utils/acl.h"
288 : #include "utils/guc.h"
289 : #include "utils/inval.h"
290 : #include "utils/lsyscache.h"
291 : #include "utils/memutils.h"
292 : #include "utils/pg_lsn.h"
293 : #include "utils/rel.h"
294 : #include "utils/rls.h"
295 : #include "utils/snapmgr.h"
296 : #include "utils/syscache.h"
297 : #include "utils/usercontext.h"
298 :
299 : #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
300 :
301 : typedef struct FlushPosition
302 : {
303 : dlist_node node;
304 : XLogRecPtr local_end;
305 : XLogRecPtr remote_end;
306 : } FlushPosition;
307 :
308 : static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
309 :
310 : typedef struct ApplyExecutionData
311 : {
312 : EState *estate; /* executor state, used to track resources */
313 :
314 : LogicalRepRelMapEntry *targetRel; /* replication target rel */
315 : ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
316 :
317 : /* These fields are used when the target relation is partitioned: */
318 : ModifyTableState *mtstate; /* dummy ModifyTable state */
319 : PartitionTupleRouting *proute; /* partition routing info */
320 : } ApplyExecutionData;
321 :
322 : /* Struct for saving and restoring apply errcontext information */
323 : typedef struct ApplyErrorCallbackArg
324 : {
325 : LogicalRepMsgType command; /* 0 if invalid */
326 : LogicalRepRelMapEntry *rel;
327 :
328 : /* Remote node information */
329 : int remote_attnum; /* -1 if invalid */
330 : TransactionId remote_xid;
331 : XLogRecPtr finish_lsn;
332 : char *origin_name;
333 : } ApplyErrorCallbackArg;
334 :
335 : /*
336 : * The action to be taken for the changes in the transaction.
337 : *
338 : * TRANS_LEADER_APPLY:
339 : * This action means that we are in the leader apply worker or table sync
340 : * worker. The changes of the transaction are either directly applied or
341 : * are read from temporary files (for streaming transactions) and then
342 : * applied by the worker.
343 : *
344 : * TRANS_LEADER_SERIALIZE:
345 : * This action means that we are in the leader apply worker or table sync
346 : * worker. Changes are written to temporary files and then applied when the
347 : * final commit arrives.
348 : *
349 : * TRANS_LEADER_SEND_TO_PARALLEL:
350 : * This action means that we are in the leader apply worker and need to send
351 : * the changes to the parallel apply worker.
352 : *
353 : * TRANS_LEADER_PARTIAL_SERIALIZE:
354 : * This action means that we are in the leader apply worker and have sent some
355 : * changes directly to the parallel apply worker and the remaining changes are
356 : * serialized to a file, due to timeout while sending data. The parallel apply
357 : * worker will apply these serialized changes when the final commit arrives.
358 : *
359 : * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
360 : * serializing changes, the leader worker also needs to serialize the
361 : * STREAM_XXX message to a file, and wait for the parallel apply worker to
362 : * finish the transaction when processing the transaction finish command. So
363 : * this new action was introduced to keep the code and logic clear.
364 : *
365 : * TRANS_PARALLEL_APPLY:
366 : * This action means that we are in the parallel apply worker and changes of
367 : * the transaction are applied directly by the worker.
368 : */
369 : typedef enum
370 : {
371 : /* The action for non-streaming transactions. */
372 : TRANS_LEADER_APPLY,
373 :
374 : /* Actions for streaming transactions. */
375 : TRANS_LEADER_SERIALIZE,
376 : TRANS_LEADER_SEND_TO_PARALLEL,
377 : TRANS_LEADER_PARTIAL_SERIALIZE,
378 : TRANS_PARALLEL_APPLY,
379 : } TransApplyAction;
380 :
381 : /*
382 : * The phases involved in advancing the non-removable transaction ID.
383 : *
384 : * See comments atop worker.c for details of the transition between these
385 : * phases.
386 : */
387 : typedef enum
388 : {
389 : RDT_GET_CANDIDATE_XID,
390 : RDT_REQUEST_PUBLISHER_STATUS,
391 : RDT_WAIT_FOR_PUBLISHER_STATUS,
392 : RDT_WAIT_FOR_LOCAL_FLUSH,
393 : RDT_STOP_CONFLICT_INFO_RETENTION,
394 : RDT_RESUME_CONFLICT_INFO_RETENTION,
395 : } RetainDeadTuplesPhase;
396 :
397 : /*
398 : * Critical information for managing phase transitions within the
399 : * RetainDeadTuplesPhase.
400 : */
401 : typedef struct RetainDeadTuplesData
402 : {
403 : RetainDeadTuplesPhase phase; /* current phase */
404 : XLogRecPtr remote_lsn; /* WAL write position on the publisher */
405 :
406 : /*
407 : * Oldest transaction ID that was in the commit phase on the publisher.
408 : * Use FullTransactionId to prevent issues with transaction ID wraparound,
409 : * where a new remote_oldestxid could falsely appear to originate from the
410 : * past and block advancement.
411 : */
412 : FullTransactionId remote_oldestxid;
413 :
414 : /*
415 : * Next transaction ID to be assigned on the publisher. Use
416 : * FullTransactionId for consistency and to allow straightforward
417 : * comparisons with remote_oldestxid.
418 : */
419 : FullTransactionId remote_nextxid;
420 :
421 : TimestampTz reply_time; /* when the publisher responds with status */
422 :
423 : /*
424 : * Publisher transaction ID that must be awaited to complete before
425 : * entering the final phase (RDT_WAIT_FOR_LOCAL_FLUSH). Use
426 : * FullTransactionId for the same reason as remote_nextxid.
427 : */
428 : FullTransactionId remote_wait_for;
429 :
430 : TransactionId candidate_xid; /* candidate for the non-removable
431 : * transaction ID */
432 : TimestampTz flushpos_update_time; /* when the remote flush position was
433 : * updated in final phase
434 : * (RDT_WAIT_FOR_LOCAL_FLUSH) */
435 :
436 : long table_sync_wait_time; /* time spent waiting for table sync
437 : * to finish */
438 :
439 : /*
440 : * The following fields are used to determine the timing for the next
441 : * round of transaction ID advancement.
442 : */
443 : TimestampTz last_recv_time; /* when the last message was received */
444 : TimestampTz candidate_xid_time; /* when the candidate_xid is decided */
445 : int xid_advance_interval; /* how much time (ms) to wait before
446 : * attempting to advance the
447 : * non-removable transaction ID */
448 : } RetainDeadTuplesData;
449 :
450 : /*
451 : * The minimum (100ms) and maximum (3 minutes) intervals for advancing
452 : * non-removable transaction IDs. The maximum interval is a bit arbitrary but
453 : * is sufficient to not cause any undue network traffic.
454 : */
455 : #define MIN_XID_ADVANCE_INTERVAL 100
456 : #define MAX_XID_ADVANCE_INTERVAL 180000
457 :
458 : /* errcontext tracker */
459 : static ApplyErrorCallbackArg apply_error_callback_arg =
460 : {
461 : .command = 0,
462 : .rel = NULL,
463 : .remote_attnum = -1,
464 : .remote_xid = InvalidTransactionId,
465 : .finish_lsn = InvalidXLogRecPtr,
466 : .origin_name = NULL,
467 : };
468 :
469 : ErrorContextCallback *apply_error_context_stack = NULL;
470 :
471 : MemoryContext ApplyMessageContext = NULL;
472 : MemoryContext ApplyContext = NULL;
473 :
474 : /* per stream context for streaming transactions */
475 : static MemoryContext LogicalStreamingContext = NULL;
476 :
477 : WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
478 :
479 : Subscription *MySubscription = NULL;
480 : static bool MySubscriptionValid = false;
481 :
482 : static List *on_commit_wakeup_workers_subids = NIL;
483 :
484 : bool in_remote_transaction = false;
485 : static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
486 :
487 : /* fields valid only when processing streamed transaction */
488 : static bool in_streamed_transaction = false;
489 :
490 : static TransactionId stream_xid = InvalidTransactionId;
491 :
492 : /*
493 : * The number of changes applied by parallel apply worker during one streaming
494 : * block.
495 : */
496 : static uint32 parallel_stream_nchanges = 0;
497 :
498 : /* Are we initializing an apply worker? */
499 : bool InitializingApplyWorker = false;
500 :
501 : /*
502 : * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
503 : * the subscription if the remote transaction's finish LSN matches the subskiplsn.
504 : * Once we start skipping changes, we don't stop it until we skip all changes of
505 : * the transaction even if pg_subscription is updated and MySubscription->skiplsn
506 : * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
507 : * we don't skip receiving and spooling the changes since we decide whether or not
508 : * to skip applying the changes when starting to apply changes. The subskiplsn is
509 : * cleared after successfully skipping the transaction or applying non-empty
510 : * transaction. The latter prevents the mistakenly specified subskiplsn from
511 : * being left. Note that we cannot skip the streaming transactions when using
512 : * parallel apply workers because we cannot get the finish LSN before applying
513 : * the changes. So, we don't start parallel apply worker when finish LSN is set
514 : * by the user.
515 : */
516 : static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
517 : #define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
518 :
519 : /* BufFile handle of the current streaming file */
520 : static BufFile *stream_fd = NULL;
521 :
522 : /*
523 : * The remote WAL position that has been applied and flushed locally. We record
524 : * and use this information both while sending feedback to the server and
525 : * advancing oldest_nonremovable_xid.
526 : */
527 : static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
528 :
529 : typedef struct SubXactInfo
530 : {
531 : TransactionId xid; /* XID of the subxact */
532 : int fileno; /* file number in the buffile */
533 : off_t offset; /* offset in the file */
534 : } SubXactInfo;
535 :
536 : /* Sub-transaction data for the current streaming transaction */
537 : typedef struct ApplySubXactData
538 : {
539 : uint32 nsubxacts; /* number of sub-transactions */
540 : uint32 nsubxacts_max; /* current capacity of subxacts */
541 : TransactionId subxact_last; /* xid of the last sub-transaction */
542 : SubXactInfo *subxacts; /* sub-xact offset in changes file */
543 : } ApplySubXactData;
544 :
545 : static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
546 :
547 : static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
548 : static inline void changes_filename(char *path, Oid subid, TransactionId xid);
549 :
550 : /*
551 : * Information about subtransactions of a given toplevel transaction.
552 : */
553 : static void subxact_info_write(Oid subid, TransactionId xid);
554 : static void subxact_info_read(Oid subid, TransactionId xid);
555 : static void subxact_info_add(TransactionId xid);
556 : static inline void cleanup_subxact_info(void);
557 :
558 : /*
559 : * Serialize and deserialize changes for a toplevel transaction.
560 : */
561 : static void stream_open_file(Oid subid, TransactionId xid,
562 : bool first_segment);
563 : static void stream_write_change(char action, StringInfo s);
564 : static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
565 : static void stream_close_file(void);
566 :
567 : static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
568 :
569 : static void maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
570 : bool status_received);
571 : static bool can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data);
572 : static void process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
573 : bool status_received);
574 : static void get_candidate_xid(RetainDeadTuplesData *rdt_data);
575 : static void request_publisher_status(RetainDeadTuplesData *rdt_data);
576 : static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
577 : bool status_received);
578 : static void wait_for_local_flush(RetainDeadTuplesData *rdt_data);
579 : static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
580 : static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
581 : static void resume_conflict_info_retention(RetainDeadTuplesData *rdt_data);
582 : static bool update_retention_status(bool active);
583 : static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data);
584 : static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data,
585 : bool new_xid_found);
586 :
587 : static void apply_worker_exit(void);
588 :
589 : static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
590 : static void apply_handle_insert_internal(ApplyExecutionData *edata,
591 : ResultRelInfo *relinfo,
592 : TupleTableSlot *remoteslot);
593 : static void apply_handle_update_internal(ApplyExecutionData *edata,
594 : ResultRelInfo *relinfo,
595 : TupleTableSlot *remoteslot,
596 : LogicalRepTupleData *newtup,
597 : Oid localindexoid);
598 : static void apply_handle_delete_internal(ApplyExecutionData *edata,
599 : ResultRelInfo *relinfo,
600 : TupleTableSlot *remoteslot,
601 : Oid localindexoid);
602 : static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
603 : LogicalRepRelation *remoterel,
604 : Oid localidxoid,
605 : TupleTableSlot *remoteslot,
606 : TupleTableSlot **localslot);
607 : static bool FindDeletedTupleInLocalRel(Relation localrel,
608 : Oid localidxoid,
609 : TupleTableSlot *remoteslot,
610 : TransactionId *delete_xid,
611 : RepOriginId *delete_origin,
612 : TimestampTz *delete_time);
613 : static void apply_handle_tuple_routing(ApplyExecutionData *edata,
614 : TupleTableSlot *remoteslot,
615 : LogicalRepTupleData *newtup,
616 : CmdType operation);
617 :
618 : /* Functions for skipping changes */
619 : static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
620 : static void stop_skipping_changes(void);
621 : static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
622 :
623 : /* Functions for apply error callback */
624 : static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
625 : static inline void reset_apply_error_context_info(void);
626 :
627 : static TransApplyAction get_transaction_apply_action(TransactionId xid,
628 : ParallelApplyWorkerInfo **winfo);
629 :
630 : static void replorigin_reset(int code, Datum arg);
631 :
632 : /*
633 : * Form the origin name for the subscription.
634 : *
635 : * This is a common function for tablesync and other workers. Tablesync workers
636 : * must pass a valid relid. Other callers must pass relid = InvalidOid.
637 : *
638 : * Return the name in the supplied buffer.
639 : */
640 : void
641 2612 : ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
642 : char *originname, Size szoriginname)
643 : {
644 2612 : if (OidIsValid(relid))
645 : {
646 : /* Replication origin name for tablesync workers. */
647 1500 : snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
648 : }
649 : else
650 : {
651 : /* Replication origin name for non-tablesync workers. */
652 1112 : snprintf(originname, szoriginname, "pg_%u", suboid);
653 : }
654 2612 : }
655 :
656 : /*
657 : * Should this worker apply changes for given relation.
658 : *
659 : * This is mainly needed for initial relation data sync as that runs in
660 : * separate worker process running in parallel and we need some way to skip
661 : * changes coming to the leader apply worker during the sync of a table.
662 : *
663 : * Note we need to do smaller or equals comparison for SYNCDONE state because
664 : * it might hold position of end of initial slot consistent point WAL
665 : * record + 1 (ie start of next record) and next record can be COMMIT of
666 : * transaction we are now processing (which is what we set remote_final_lsn
667 : * to in apply_handle_begin).
668 : *
669 : * Note that for streaming transactions that are being applied in the parallel
670 : * apply worker, we disallow applying changes if the target table in the
671 : * subscription is not in the READY state, because we cannot decide whether to
672 : * apply the change as we won't know remote_final_lsn by that time.
673 : *
674 : * We already checked this in pa_can_start() before assigning the
675 : * streaming transaction to the parallel worker, but it also needs to be
676 : * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
677 : * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
678 : * while applying this transaction.
679 : */
680 : static bool
681 296824 : should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
682 : {
683 296824 : switch (MyLogicalRepWorker->type)
684 : {
685 4 : case WORKERTYPE_TABLESYNC:
686 4 : return MyLogicalRepWorker->relid == rel->localreloid;
687 :
688 127310 : case WORKERTYPE_PARALLEL_APPLY:
689 : /* We don't synchronize rel's that are in unknown state. */
690 127310 : if (rel->state != SUBREL_STATE_READY &&
691 0 : rel->state != SUBREL_STATE_UNKNOWN)
692 0 : ereport(ERROR,
693 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
694 : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
695 : MySubscription->name),
696 : errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
697 :
698 127310 : return rel->state == SUBREL_STATE_READY;
699 :
700 169510 : case WORKERTYPE_APPLY:
701 169632 : return (rel->state == SUBREL_STATE_READY ||
702 122 : (rel->state == SUBREL_STATE_SYNCDONE &&
703 24 : rel->statelsn <= remote_final_lsn));
704 :
705 0 : case WORKERTYPE_UNKNOWN:
706 : /* Should never happen. */
707 0 : elog(ERROR, "Unknown worker type");
708 : }
709 :
710 0 : return false; /* dummy for compiler */
711 : }
712 :
713 : /*
714 : * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
715 : *
716 : * Start a transaction, if this is the first step (else we keep using the
717 : * existing transaction).
718 : * Also provide a global snapshot and ensure we run in ApplyMessageContext.
719 : */
720 : static void
721 297734 : begin_replication_step(void)
722 : {
723 297734 : SetCurrentStatementStartTimestamp();
724 :
725 297734 : if (!IsTransactionState())
726 : {
727 1904 : StartTransactionCommand();
728 1904 : maybe_reread_subscription();
729 : }
730 :
731 297728 : PushActiveSnapshot(GetTransactionSnapshot());
732 :
733 297728 : MemoryContextSwitchTo(ApplyMessageContext);
734 297728 : }
735 :
736 : /*
737 : * Finish up one step of a replication transaction.
738 : * Callers of begin_replication_step() must also call this.
739 : *
740 : * We don't close out the transaction here, but we should increment
741 : * the command counter to make the effects of this step visible.
742 : */
743 : static void
744 297648 : end_replication_step(void)
745 : {
746 297648 : PopActiveSnapshot();
747 :
748 297648 : CommandCounterIncrement();
749 297648 : }
750 :
751 : /*
752 : * Handle streamed transactions for both the leader apply worker and the
753 : * parallel apply workers.
754 : *
755 : * In the streaming case (receiving a block of the streamed transaction), for
756 : * serialize mode, simply redirect it to a file for the proper toplevel
757 : * transaction, and for parallel mode, the leader apply worker will send the
758 : * changes to parallel apply workers and the parallel apply worker will define
759 : * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
760 : * messages will be applied by both leader apply worker and parallel apply
761 : * workers).
762 : *
763 : * Returns true for streamed transactions (when the change is either serialized
764 : * to file or sent to parallel apply worker), false otherwise (regular mode or
765 : * needs to be processed by parallel apply worker).
766 : *
767 : * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
768 : * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
769 : * to a parallel apply worker.
770 : */
771 : static bool
772 639462 : handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
773 : {
774 : TransactionId current_xid;
775 : ParallelApplyWorkerInfo *winfo;
776 : TransApplyAction apply_action;
777 : StringInfoData original_msg;
778 :
779 639462 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
780 :
781 : /* not in streaming mode */
782 639462 : if (apply_action == TRANS_LEADER_APPLY)
783 170290 : return false;
784 :
785 : Assert(TransactionIdIsValid(stream_xid));
786 :
787 : /*
788 : * The parallel apply worker needs the xid in this message to decide
789 : * whether to define a savepoint, so save the original message that has
790 : * not moved the cursor after the xid. We will serialize this message to a
791 : * file in PARTIAL_SERIALIZE mode.
792 : */
793 469172 : original_msg = *s;
794 :
795 : /*
796 : * We should have received XID of the subxact as the first part of the
797 : * message, so extract it.
798 : */
799 469172 : current_xid = pq_getmsgint(s, 4);
800 :
801 469172 : if (!TransactionIdIsValid(current_xid))
802 0 : ereport(ERROR,
803 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
804 : errmsg_internal("invalid transaction ID in streamed replication transaction")));
805 :
806 469172 : switch (apply_action)
807 : {
808 205026 : case TRANS_LEADER_SERIALIZE:
809 : Assert(stream_fd);
810 :
811 : /* Add the new subxact to the array (unless already there). */
812 205026 : subxact_info_add(current_xid);
813 :
814 : /* Write the change to the current file */
815 205026 : stream_write_change(action, s);
816 205026 : return true;
817 :
818 126770 : case TRANS_LEADER_SEND_TO_PARALLEL:
819 : Assert(winfo);
820 :
821 : /*
822 : * XXX The publisher side doesn't always send relation/type update
823 : * messages after the streaming transaction, so also update the
824 : * relation/type in leader apply worker. See function
825 : * cleanup_rel_sync_cache.
826 : */
827 126770 : if (pa_send_data(winfo, s->len, s->data))
828 126770 : return (action != LOGICAL_REP_MSG_RELATION &&
829 : action != LOGICAL_REP_MSG_TYPE);
830 :
831 : /*
832 : * Switch to serialize mode when we are not able to send the
833 : * change to parallel apply worker.
834 : */
835 0 : pa_switch_to_partial_serialize(winfo, false);
836 :
837 : /* fall through */
838 10012 : case TRANS_LEADER_PARTIAL_SERIALIZE:
839 10012 : stream_write_change(action, &original_msg);
840 :
841 : /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
842 10012 : return (action != LOGICAL_REP_MSG_RELATION &&
843 : action != LOGICAL_REP_MSG_TYPE);
844 :
845 127364 : case TRANS_PARALLEL_APPLY:
846 127364 : parallel_stream_nchanges += 1;
847 :
848 : /* Define a savepoint for a subxact if needed. */
849 127364 : pa_start_subtrans(current_xid, stream_xid);
850 127364 : return false;
851 :
852 0 : default:
853 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
854 : return false; /* silence compiler warning */
855 : }
856 : }
857 :
858 : /*
859 : * Executor state preparation for evaluation of constraint expressions,
860 : * indexes and triggers for the specified relation.
861 : *
862 : * Note that the caller must open and close any indexes to be updated.
863 : */
864 : static ApplyExecutionData *
865 296666 : create_edata_for_relation(LogicalRepRelMapEntry *rel)
866 : {
867 : ApplyExecutionData *edata;
868 : EState *estate;
869 : RangeTblEntry *rte;
870 296666 : List *perminfos = NIL;
871 : ResultRelInfo *resultRelInfo;
872 :
873 296666 : edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
874 296666 : edata->targetRel = rel;
875 :
876 296666 : edata->estate = estate = CreateExecutorState();
877 :
878 296666 : rte = makeNode(RangeTblEntry);
879 296666 : rte->rtekind = RTE_RELATION;
880 296666 : rte->relid = RelationGetRelid(rel->localrel);
881 296666 : rte->relkind = rel->localrel->rd_rel->relkind;
882 296666 : rte->rellockmode = AccessShareLock;
883 :
884 296666 : addRTEPermissionInfo(&perminfos, rte);
885 :
886 296666 : ExecInitRangeTable(estate, list_make1(rte), perminfos,
887 : bms_make_singleton(1));
888 :
889 296666 : edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
890 :
891 : /*
892 : * Use Relation opened by logicalrep_rel_open() instead of opening it
893 : * again.
894 : */
895 296666 : InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
896 :
897 : /*
898 : * We put the ResultRelInfo in the es_opened_result_relations list, even
899 : * though we don't populate the es_result_relations array. That's a bit
900 : * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
901 : *
902 : * ExecOpenIndices() is not called here either, each execution path doing
903 : * an apply operation being responsible for that.
904 : */
905 296666 : estate->es_opened_result_relations =
906 296666 : lappend(estate->es_opened_result_relations, resultRelInfo);
907 :
908 296666 : estate->es_output_cid = GetCurrentCommandId(true);
909 :
910 : /* Prepare to catch AFTER triggers. */
911 296666 : AfterTriggerBeginQuery();
912 :
913 : /* other fields of edata remain NULL for now */
914 :
915 296666 : return edata;
916 : }
917 :
918 : /*
919 : * Finish any operations related to the executor state created by
920 : * create_edata_for_relation().
921 : */
922 : static void
923 296602 : finish_edata(ApplyExecutionData *edata)
924 : {
925 296602 : EState *estate = edata->estate;
926 :
927 : /* Handle any queued AFTER triggers. */
928 296602 : AfterTriggerEndQuery(estate);
929 :
930 : /* Shut down tuple routing, if any was done. */
931 296602 : if (edata->proute)
932 148 : ExecCleanupTupleRouting(edata->mtstate, edata->proute);
933 :
934 : /*
935 : * Cleanup. It might seem that we should call ExecCloseResultRelations()
936 : * here, but we intentionally don't. It would close the rel we added to
937 : * es_opened_result_relations above, which is wrong because we took no
938 : * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
939 : * any other relations opened during execution.
940 : */
941 296602 : ExecResetTupleTable(estate->es_tupleTable, false);
942 296602 : FreeExecutorState(estate);
943 296602 : pfree(edata);
944 296602 : }
945 :
946 : /*
947 : * Executes default values for columns for which we can't map to remote
948 : * relation columns.
949 : *
950 : * This allows us to support tables which have more columns on the downstream
951 : * than on the upstream.
952 : */
953 : static void
954 152146 : slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
955 : TupleTableSlot *slot)
956 : {
957 152146 : TupleDesc desc = RelationGetDescr(rel->localrel);
958 152146 : int num_phys_attrs = desc->natts;
959 : int i;
960 : int attnum,
961 152146 : num_defaults = 0;
962 : int *defmap;
963 : ExprState **defexprs;
964 : ExprContext *econtext;
965 :
966 152146 : econtext = GetPerTupleExprContext(estate);
967 :
968 : /* We got all the data via replication, no need to evaluate anything. */
969 152146 : if (num_phys_attrs == rel->remoterel.natts)
970 71856 : return;
971 :
972 80290 : defmap = (int *) palloc(num_phys_attrs * sizeof(int));
973 80290 : defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
974 :
975 : Assert(rel->attrmap->maplen == num_phys_attrs);
976 421326 : for (attnum = 0; attnum < num_phys_attrs; attnum++)
977 : {
978 341036 : CompactAttribute *cattr = TupleDescCompactAttr(desc, attnum);
979 : Expr *defexpr;
980 :
981 341036 : if (cattr->attisdropped || cattr->attgenerated)
982 18 : continue;
983 :
984 341018 : if (rel->attrmap->attnums[attnum] >= 0)
985 184536 : continue;
986 :
987 156482 : defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
988 :
989 156482 : if (defexpr != NULL)
990 : {
991 : /* Run the expression through planner */
992 140262 : defexpr = expression_planner(defexpr);
993 :
994 : /* Initialize executable expression in copycontext */
995 140262 : defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
996 140262 : defmap[num_defaults] = attnum;
997 140262 : num_defaults++;
998 : }
999 : }
1000 :
1001 220552 : for (i = 0; i < num_defaults; i++)
1002 140262 : slot->tts_values[defmap[i]] =
1003 140262 : ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
1004 : }
1005 :
1006 : /*
1007 : * Store tuple data into slot.
1008 : *
1009 : * Incoming data can be either text or binary format.
1010 : */
1011 : static void
1012 296694 : slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
1013 : LogicalRepTupleData *tupleData)
1014 : {
1015 296694 : int natts = slot->tts_tupleDescriptor->natts;
1016 : int i;
1017 :
1018 296694 : ExecClearTuple(slot);
1019 :
1020 : /* Call the "in" function for each non-dropped, non-null attribute */
1021 : Assert(natts == rel->attrmap->maplen);
1022 1316416 : for (i = 0; i < natts; i++)
1023 : {
1024 1019722 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
1025 1019722 : int remoteattnum = rel->attrmap->attnums[i];
1026 :
1027 1019722 : if (!att->attisdropped && remoteattnum >= 0)
1028 605888 : {
1029 605888 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
1030 :
1031 : Assert(remoteattnum < tupleData->ncols);
1032 :
1033 : /* Set attnum for error callback */
1034 605888 : apply_error_callback_arg.remote_attnum = remoteattnum;
1035 :
1036 605888 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
1037 : {
1038 : Oid typinput;
1039 : Oid typioparam;
1040 :
1041 284684 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1042 569368 : slot->tts_values[i] =
1043 284684 : OidInputFunctionCall(typinput, colvalue->data,
1044 : typioparam, att->atttypmod);
1045 284684 : slot->tts_isnull[i] = false;
1046 : }
1047 321204 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1048 : {
1049 : Oid typreceive;
1050 : Oid typioparam;
1051 :
1052 : /*
1053 : * In some code paths we may be asked to re-parse the same
1054 : * tuple data. Reset the StringInfo's cursor so that works.
1055 : */
1056 220540 : colvalue->cursor = 0;
1057 :
1058 220540 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1059 441080 : slot->tts_values[i] =
1060 220540 : OidReceiveFunctionCall(typreceive, colvalue,
1061 : typioparam, att->atttypmod);
1062 :
1063 : /* Trouble if it didn't eat the whole buffer */
1064 220540 : if (colvalue->cursor != colvalue->len)
1065 0 : ereport(ERROR,
1066 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
1067 : errmsg("incorrect binary data format in logical replication column %d",
1068 : remoteattnum + 1)));
1069 220540 : slot->tts_isnull[i] = false;
1070 : }
1071 : else
1072 : {
1073 : /*
1074 : * NULL value from remote. (We don't expect to see
1075 : * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
1076 : * NULL.)
1077 : */
1078 100664 : slot->tts_values[i] = (Datum) 0;
1079 100664 : slot->tts_isnull[i] = true;
1080 : }
1081 :
1082 : /* Reset attnum for error callback */
1083 605888 : apply_error_callback_arg.remote_attnum = -1;
1084 : }
1085 : else
1086 : {
1087 : /*
1088 : * We assign NULL to dropped attributes and missing values
1089 : * (missing values should be later filled using
1090 : * slot_fill_defaults).
1091 : */
1092 413834 : slot->tts_values[i] = (Datum) 0;
1093 413834 : slot->tts_isnull[i] = true;
1094 : }
1095 : }
1096 :
1097 296694 : ExecStoreVirtualTuple(slot);
1098 296694 : }
1099 :
1100 : /*
1101 : * Replace updated columns with data from the LogicalRepTupleData struct.
1102 : * This is somewhat similar to heap_modify_tuple but also calls the type
1103 : * input functions on the user data.
1104 : *
1105 : * "slot" is filled with a copy of the tuple in "srcslot", replacing
1106 : * columns provided in "tupleData" and leaving others as-is.
1107 : *
1108 : * Caution: unreplaced pass-by-ref columns in "slot" will point into the
1109 : * storage for "srcslot". This is OK for current usage, but someday we may
1110 : * need to materialize "slot" at the end to make it independent of "srcslot".
1111 : */
1112 : static void
1113 63848 : slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
1114 : LogicalRepRelMapEntry *rel,
1115 : LogicalRepTupleData *tupleData)
1116 : {
1117 63848 : int natts = slot->tts_tupleDescriptor->natts;
1118 : int i;
1119 :
1120 : /* We'll fill "slot" with a virtual tuple, so we must start with ... */
1121 63848 : ExecClearTuple(slot);
1122 :
1123 : /*
1124 : * Copy all the column data from srcslot, so that we'll have valid values
1125 : * for unreplaced columns.
1126 : */
1127 : Assert(natts == srcslot->tts_tupleDescriptor->natts);
1128 63848 : slot_getallattrs(srcslot);
1129 63848 : memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
1130 63848 : memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
1131 :
1132 : /* Call the "in" function for each replaced attribute */
1133 : Assert(natts == rel->attrmap->maplen);
1134 318560 : for (i = 0; i < natts; i++)
1135 : {
1136 254712 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
1137 254712 : int remoteattnum = rel->attrmap->attnums[i];
1138 :
1139 254712 : if (remoteattnum < 0)
1140 117038 : continue;
1141 :
1142 : Assert(remoteattnum < tupleData->ncols);
1143 :
1144 137674 : if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
1145 : {
1146 137668 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
1147 :
1148 : /* Set attnum for error callback */
1149 137668 : apply_error_callback_arg.remote_attnum = remoteattnum;
1150 :
1151 137668 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
1152 : {
1153 : Oid typinput;
1154 : Oid typioparam;
1155 :
1156 50860 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1157 101720 : slot->tts_values[i] =
1158 50860 : OidInputFunctionCall(typinput, colvalue->data,
1159 : typioparam, att->atttypmod);
1160 50860 : slot->tts_isnull[i] = false;
1161 : }
1162 86808 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1163 : {
1164 : Oid typreceive;
1165 : Oid typioparam;
1166 :
1167 : /*
1168 : * In some code paths we may be asked to re-parse the same
1169 : * tuple data. Reset the StringInfo's cursor so that works.
1170 : */
1171 86712 : colvalue->cursor = 0;
1172 :
1173 86712 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1174 173424 : slot->tts_values[i] =
1175 86712 : OidReceiveFunctionCall(typreceive, colvalue,
1176 : typioparam, att->atttypmod);
1177 :
1178 : /* Trouble if it didn't eat the whole buffer */
1179 86712 : if (colvalue->cursor != colvalue->len)
1180 0 : ereport(ERROR,
1181 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
1182 : errmsg("incorrect binary data format in logical replication column %d",
1183 : remoteattnum + 1)));
1184 86712 : slot->tts_isnull[i] = false;
1185 : }
1186 : else
1187 : {
1188 : /* must be LOGICALREP_COLUMN_NULL */
1189 96 : slot->tts_values[i] = (Datum) 0;
1190 96 : slot->tts_isnull[i] = true;
1191 : }
1192 :
1193 : /* Reset attnum for error callback */
1194 137668 : apply_error_callback_arg.remote_attnum = -1;
1195 : }
1196 : }
1197 :
1198 : /* And finally, declare that "slot" contains a valid virtual tuple */
1199 63848 : ExecStoreVirtualTuple(slot);
1200 63848 : }
1201 :
1202 : /*
1203 : * Handle BEGIN message.
1204 : */
1205 : static void
1206 958 : apply_handle_begin(StringInfo s)
1207 : {
1208 : LogicalRepBeginData begin_data;
1209 :
1210 : /* There must not be an active streaming transaction. */
1211 : Assert(!TransactionIdIsValid(stream_xid));
1212 :
1213 958 : logicalrep_read_begin(s, &begin_data);
1214 958 : set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
1215 :
1216 958 : remote_final_lsn = begin_data.final_lsn;
1217 :
1218 958 : maybe_start_skipping_changes(begin_data.final_lsn);
1219 :
1220 958 : in_remote_transaction = true;
1221 :
1222 958 : pgstat_report_activity(STATE_RUNNING, NULL);
1223 958 : }
1224 :
1225 : /*
1226 : * Handle COMMIT message.
1227 : *
1228 : * TODO, support tracking of multiple origins
1229 : */
1230 : static void
1231 876 : apply_handle_commit(StringInfo s)
1232 : {
1233 : LogicalRepCommitData commit_data;
1234 :
1235 876 : logicalrep_read_commit(s, &commit_data);
1236 :
1237 876 : if (commit_data.commit_lsn != remote_final_lsn)
1238 0 : ereport(ERROR,
1239 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1240 : errmsg_internal("incorrect commit LSN %X/%08X in commit message (expected %X/%08X)",
1241 : LSN_FORMAT_ARGS(commit_data.commit_lsn),
1242 : LSN_FORMAT_ARGS(remote_final_lsn))));
1243 :
1244 876 : apply_handle_commit_internal(&commit_data);
1245 :
1246 : /* Process any tables that are being synchronized in parallel. */
1247 876 : ProcessSyncingRelations(commit_data.end_lsn);
1248 :
1249 874 : pgstat_report_activity(STATE_IDLE, NULL);
1250 874 : reset_apply_error_context_info();
1251 874 : }
1252 :
1253 : /*
1254 : * Handle BEGIN PREPARE message.
1255 : */
1256 : static void
1257 32 : apply_handle_begin_prepare(StringInfo s)
1258 : {
1259 : LogicalRepPreparedTxnData begin_data;
1260 :
1261 : /* Tablesync should never receive prepare. */
1262 32 : if (am_tablesync_worker())
1263 0 : ereport(ERROR,
1264 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1265 : errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1266 :
1267 : /* There must not be an active streaming transaction. */
1268 : Assert(!TransactionIdIsValid(stream_xid));
1269 :
1270 32 : logicalrep_read_begin_prepare(s, &begin_data);
1271 32 : set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
1272 :
1273 32 : remote_final_lsn = begin_data.prepare_lsn;
1274 :
1275 32 : maybe_start_skipping_changes(begin_data.prepare_lsn);
1276 :
1277 32 : in_remote_transaction = true;
1278 :
1279 32 : pgstat_report_activity(STATE_RUNNING, NULL);
1280 32 : }
1281 :
1282 : /*
1283 : * Common function to prepare the GID.
1284 : */
1285 : static void
1286 46 : apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
1287 : {
1288 : char gid[GIDSIZE];
1289 :
1290 : /*
1291 : * Compute unique GID for two_phase transactions. We don't use GID of
1292 : * prepared transaction sent by server as that can lead to deadlock when
1293 : * we have multiple subscriptions from same node point to publications on
1294 : * the same node. See comments atop worker.c
1295 : */
1296 46 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
1297 : gid, sizeof(gid));
1298 :
1299 : /*
1300 : * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1301 : * called within the PrepareTransactionBlock below.
1302 : */
1303 46 : if (!IsTransactionBlock())
1304 : {
1305 46 : BeginTransactionBlock();
1306 46 : CommitTransactionCommand(); /* Completes the preceding Begin command. */
1307 : }
1308 :
1309 : /*
1310 : * Update origin state so we can restart streaming from correct position
1311 : * in case of crash.
1312 : */
1313 46 : replorigin_session_origin_lsn = prepare_data->end_lsn;
1314 46 : replorigin_session_origin_timestamp = prepare_data->prepare_time;
1315 :
1316 46 : PrepareTransactionBlock(gid);
1317 46 : }
1318 :
1319 : /*
1320 : * Handle PREPARE message.
1321 : */
1322 : static void
1323 30 : apply_handle_prepare(StringInfo s)
1324 : {
1325 : LogicalRepPreparedTxnData prepare_data;
1326 :
1327 30 : logicalrep_read_prepare(s, &prepare_data);
1328 :
1329 30 : if (prepare_data.prepare_lsn != remote_final_lsn)
1330 0 : ereport(ERROR,
1331 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1332 : errmsg_internal("incorrect prepare LSN %X/%08X in prepare message (expected %X/%08X)",
1333 : LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1334 : LSN_FORMAT_ARGS(remote_final_lsn))));
1335 :
1336 : /*
1337 : * Unlike commit, here, we always prepare the transaction even though no
1338 : * change has happened in this transaction or all changes are skipped. It
1339 : * is done this way because at commit prepared time, we won't know whether
1340 : * we have skipped preparing a transaction because of those reasons.
1341 : *
1342 : * XXX, We can optimize such that at commit prepared time, we first check
1343 : * whether we have prepared the transaction or not but that doesn't seem
1344 : * worthwhile because such cases shouldn't be common.
1345 : */
1346 30 : begin_replication_step();
1347 :
1348 30 : apply_handle_prepare_internal(&prepare_data);
1349 :
1350 30 : end_replication_step();
1351 30 : CommitTransactionCommand();
1352 28 : pgstat_report_stat(false);
1353 :
1354 : /*
1355 : * It is okay not to set the local_end LSN for the prepare because we
1356 : * always flush the prepare record. So, we can send the acknowledgment of
1357 : * the remote_end LSN as soon as prepare is finished.
1358 : *
1359 : * XXX For the sake of consistency with commit, we could have set it with
1360 : * the LSN of prepare but as of now we don't track that value similar to
1361 : * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
1362 : * it.
1363 : */
1364 28 : store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
1365 :
1366 28 : in_remote_transaction = false;
1367 :
1368 : /* Process any tables that are being synchronized in parallel. */
1369 28 : ProcessSyncingRelations(prepare_data.end_lsn);
1370 :
1371 : /*
1372 : * Since we have already prepared the transaction, in a case where the
1373 : * server crashes before clearing the subskiplsn, it will be left but the
1374 : * transaction won't be resent. But that's okay because it's a rare case
1375 : * and the subskiplsn will be cleared when finishing the next transaction.
1376 : */
1377 28 : stop_skipping_changes();
1378 28 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
1379 :
1380 28 : pgstat_report_activity(STATE_IDLE, NULL);
1381 28 : reset_apply_error_context_info();
1382 28 : }
1383 :
1384 : /*
1385 : * Handle a COMMIT PREPARED of a previously PREPARED transaction.
1386 : *
1387 : * Note that we don't need to wait here if the transaction was prepared in a
1388 : * parallel apply worker. In that case, we have already waited for the prepare
1389 : * to finish in apply_handle_stream_prepare() which will ensure all the
1390 : * operations in that transaction have happened in the subscriber, so no
1391 : * concurrent transaction can cause deadlock or transaction dependency issues.
1392 : */
1393 : static void
1394 40 : apply_handle_commit_prepared(StringInfo s)
1395 : {
1396 : LogicalRepCommitPreparedTxnData prepare_data;
1397 : char gid[GIDSIZE];
1398 :
1399 40 : logicalrep_read_commit_prepared(s, &prepare_data);
1400 40 : set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
1401 :
1402 : /* Compute GID for two_phase transactions. */
1403 40 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
1404 : gid, sizeof(gid));
1405 :
1406 : /* There is no transaction when COMMIT PREPARED is called */
1407 40 : begin_replication_step();
1408 :
1409 : /*
1410 : * Update origin state so we can restart streaming from correct position
1411 : * in case of crash.
1412 : */
1413 40 : replorigin_session_origin_lsn = prepare_data.end_lsn;
1414 40 : replorigin_session_origin_timestamp = prepare_data.commit_time;
1415 :
1416 40 : FinishPreparedTransaction(gid, true);
1417 40 : end_replication_step();
1418 40 : CommitTransactionCommand();
1419 40 : pgstat_report_stat(false);
1420 :
1421 40 : store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
1422 40 : in_remote_transaction = false;
1423 :
1424 : /* Process any tables that are being synchronized in parallel. */
1425 40 : ProcessSyncingRelations(prepare_data.end_lsn);
1426 :
1427 40 : clear_subscription_skip_lsn(prepare_data.end_lsn);
1428 :
1429 40 : pgstat_report_activity(STATE_IDLE, NULL);
1430 40 : reset_apply_error_context_info();
1431 40 : }
1432 :
1433 : /*
1434 : * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
1435 : *
1436 : * Note that we don't need to wait here if the transaction was prepared in a
1437 : * parallel apply worker. In that case, we have already waited for the prepare
1438 : * to finish in apply_handle_stream_prepare() which will ensure all the
1439 : * operations in that transaction have happened in the subscriber, so no
1440 : * concurrent transaction can cause deadlock or transaction dependency issues.
1441 : */
1442 : static void
1443 10 : apply_handle_rollback_prepared(StringInfo s)
1444 : {
1445 : LogicalRepRollbackPreparedTxnData rollback_data;
1446 : char gid[GIDSIZE];
1447 :
1448 10 : logicalrep_read_rollback_prepared(s, &rollback_data);
1449 10 : set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1450 :
1451 : /* Compute GID for two_phase transactions. */
1452 10 : TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
1453 : gid, sizeof(gid));
1454 :
1455 : /*
1456 : * It is possible that we haven't received prepare because it occurred
1457 : * before walsender reached a consistent point or the two_phase was still
1458 : * not enabled by that time, so in such cases, we need to skip rollback
1459 : * prepared.
1460 : */
1461 10 : if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1462 : rollback_data.prepare_time))
1463 : {
1464 : /*
1465 : * Update origin state so we can restart streaming from correct
1466 : * position in case of crash.
1467 : */
1468 10 : replorigin_session_origin_lsn = rollback_data.rollback_end_lsn;
1469 10 : replorigin_session_origin_timestamp = rollback_data.rollback_time;
1470 :
1471 : /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1472 10 : begin_replication_step();
1473 10 : FinishPreparedTransaction(gid, false);
1474 10 : end_replication_step();
1475 10 : CommitTransactionCommand();
1476 :
1477 10 : clear_subscription_skip_lsn(rollback_data.rollback_end_lsn);
1478 : }
1479 :
1480 10 : pgstat_report_stat(false);
1481 :
1482 : /*
1483 : * It is okay not to set the local_end LSN for the rollback of prepared
1484 : * transaction because we always flush the WAL record for it. See
1485 : * apply_handle_prepare.
1486 : */
1487 10 : store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr);
1488 10 : in_remote_transaction = false;
1489 :
1490 : /* Process any tables that are being synchronized in parallel. */
1491 10 : ProcessSyncingRelations(rollback_data.rollback_end_lsn);
1492 :
1493 10 : pgstat_report_activity(STATE_IDLE, NULL);
1494 10 : reset_apply_error_context_info();
1495 10 : }
1496 :
1497 : /*
1498 : * Handle STREAM PREPARE.
1499 : */
1500 : static void
1501 22 : apply_handle_stream_prepare(StringInfo s)
1502 : {
1503 : LogicalRepPreparedTxnData prepare_data;
1504 : ParallelApplyWorkerInfo *winfo;
1505 : TransApplyAction apply_action;
1506 :
1507 : /* Save the message before it is consumed. */
1508 22 : StringInfoData original_msg = *s;
1509 :
1510 22 : if (in_streamed_transaction)
1511 0 : ereport(ERROR,
1512 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1513 : errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1514 :
1515 : /* Tablesync should never receive prepare. */
1516 22 : if (am_tablesync_worker())
1517 0 : ereport(ERROR,
1518 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1519 : errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1520 :
1521 22 : logicalrep_read_stream_prepare(s, &prepare_data);
1522 22 : set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1523 :
1524 22 : apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
1525 :
1526 22 : switch (apply_action)
1527 : {
1528 10 : case TRANS_LEADER_APPLY:
1529 :
1530 : /*
1531 : * The transaction has been serialized to file, so replay all the
1532 : * spooled operations.
1533 : */
1534 10 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset,
1535 : prepare_data.xid, prepare_data.prepare_lsn);
1536 :
1537 : /* Mark the transaction as prepared. */
1538 10 : apply_handle_prepare_internal(&prepare_data);
1539 :
1540 10 : CommitTransactionCommand();
1541 :
1542 : /*
1543 : * It is okay not to set the local_end LSN for the prepare because
1544 : * we always flush the prepare record. See apply_handle_prepare.
1545 : */
1546 10 : store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
1547 :
1548 10 : in_remote_transaction = false;
1549 :
1550 : /* Unlink the files with serialized changes and subxact info. */
1551 10 : stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
1552 :
1553 10 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1554 10 : break;
1555 :
1556 4 : case TRANS_LEADER_SEND_TO_PARALLEL:
1557 : Assert(winfo);
1558 :
1559 4 : if (pa_send_data(winfo, s->len, s->data))
1560 : {
1561 : /* Finish processing the streaming transaction. */
1562 4 : pa_xact_finish(winfo, prepare_data.end_lsn);
1563 4 : break;
1564 : }
1565 :
1566 : /*
1567 : * Switch to serialize mode when we are not able to send the
1568 : * change to parallel apply worker.
1569 : */
1570 0 : pa_switch_to_partial_serialize(winfo, true);
1571 :
1572 : /* fall through */
1573 2 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1574 : Assert(winfo);
1575 :
1576 2 : stream_open_and_write_change(prepare_data.xid,
1577 : LOGICAL_REP_MSG_STREAM_PREPARE,
1578 : &original_msg);
1579 :
1580 2 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
1581 :
1582 : /* Finish processing the streaming transaction. */
1583 2 : pa_xact_finish(winfo, prepare_data.end_lsn);
1584 2 : break;
1585 :
1586 6 : case TRANS_PARALLEL_APPLY:
1587 :
1588 : /*
1589 : * If the parallel apply worker is applying spooled messages then
1590 : * close the file before preparing.
1591 : */
1592 6 : if (stream_fd)
1593 2 : stream_close_file();
1594 :
1595 6 : begin_replication_step();
1596 :
1597 : /* Mark the transaction as prepared. */
1598 6 : apply_handle_prepare_internal(&prepare_data);
1599 :
1600 6 : end_replication_step();
1601 :
1602 6 : CommitTransactionCommand();
1603 :
1604 : /*
1605 : * It is okay not to set the local_end LSN for the prepare because
1606 : * we always flush the prepare record. See apply_handle_prepare.
1607 : */
1608 6 : MyParallelShared->last_commit_end = InvalidXLogRecPtr;
1609 :
1610 6 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
1611 6 : pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1612 :
1613 6 : pa_reset_subtrans();
1614 :
1615 6 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1616 6 : break;
1617 :
1618 0 : default:
1619 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1620 : break;
1621 : }
1622 :
1623 22 : pgstat_report_stat(false);
1624 :
1625 : /* Process any tables that are being synchronized in parallel. */
1626 22 : ProcessSyncingRelations(prepare_data.end_lsn);
1627 :
1628 : /*
1629 : * Similar to prepare case, the subskiplsn could be left in a case of
1630 : * server crash but it's okay. See the comments in apply_handle_prepare().
1631 : */
1632 22 : stop_skipping_changes();
1633 22 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
1634 :
1635 22 : pgstat_report_activity(STATE_IDLE, NULL);
1636 :
1637 22 : reset_apply_error_context_info();
1638 22 : }
1639 :
1640 : /*
1641 : * Handle ORIGIN message.
1642 : *
1643 : * TODO, support tracking of multiple origins
1644 : */
1645 : static void
1646 14 : apply_handle_origin(StringInfo s)
1647 : {
1648 : /*
1649 : * ORIGIN message can only come inside streaming transaction or inside
1650 : * remote transaction and before any actual writes.
1651 : */
1652 14 : if (!in_streamed_transaction &&
1653 20 : (!in_remote_transaction ||
1654 10 : (IsTransactionState() && !am_tablesync_worker())))
1655 0 : ereport(ERROR,
1656 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1657 : errmsg_internal("ORIGIN message sent out of order")));
1658 14 : }
1659 :
1660 : /*
1661 : * Initialize fileset (if not already done).
1662 : *
1663 : * Create a new file when first_segment is true, otherwise open the existing
1664 : * file.
1665 : */
1666 : void
1667 726 : stream_start_internal(TransactionId xid, bool first_segment)
1668 : {
1669 726 : begin_replication_step();
1670 :
1671 : /*
1672 : * Initialize the worker's stream_fileset if we haven't yet. This will be
1673 : * used for the entire duration of the worker so create it in a permanent
1674 : * context. We create this on the very first streaming message from any
1675 : * transaction and then use it for this and other streaming transactions.
1676 : * Now, we could create a fileset at the start of the worker as well but
1677 : * then we won't be sure that it will ever be used.
1678 : */
1679 726 : if (!MyLogicalRepWorker->stream_fileset)
1680 : {
1681 : MemoryContext oldctx;
1682 :
1683 28 : oldctx = MemoryContextSwitchTo(ApplyContext);
1684 :
1685 28 : MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
1686 28 : FileSetInit(MyLogicalRepWorker->stream_fileset);
1687 :
1688 28 : MemoryContextSwitchTo(oldctx);
1689 : }
1690 :
1691 : /* Open the spool file for this transaction. */
1692 726 : stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
1693 :
1694 : /* If this is not the first segment, open existing subxact file. */
1695 726 : if (!first_segment)
1696 662 : subxact_info_read(MyLogicalRepWorker->subid, xid);
1697 :
1698 726 : end_replication_step();
1699 726 : }
1700 :
1701 : /*
1702 : * Handle STREAM START message.
1703 : */
1704 : static void
1705 1666 : apply_handle_stream_start(StringInfo s)
1706 : {
1707 : bool first_segment;
1708 : ParallelApplyWorkerInfo *winfo;
1709 : TransApplyAction apply_action;
1710 :
1711 : /* Save the message before it is consumed. */
1712 1666 : StringInfoData original_msg = *s;
1713 :
1714 1666 : if (in_streamed_transaction)
1715 0 : ereport(ERROR,
1716 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1717 : errmsg_internal("duplicate STREAM START message")));
1718 :
1719 : /* There must not be an active streaming transaction. */
1720 : Assert(!TransactionIdIsValid(stream_xid));
1721 :
1722 : /* notify handle methods we're processing a remote transaction */
1723 1666 : in_streamed_transaction = true;
1724 :
1725 : /* extract XID of the top-level transaction */
1726 1666 : stream_xid = logicalrep_read_stream_start(s, &first_segment);
1727 :
1728 1666 : if (!TransactionIdIsValid(stream_xid))
1729 0 : ereport(ERROR,
1730 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1731 : errmsg_internal("invalid transaction ID in streamed replication transaction")));
1732 :
1733 1666 : set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
1734 :
1735 : /* Try to allocate a worker for the streaming transaction. */
1736 1666 : if (first_segment)
1737 160 : pa_allocate_worker(stream_xid);
1738 :
1739 1666 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1740 :
1741 1666 : switch (apply_action)
1742 : {
1743 686 : case TRANS_LEADER_SERIALIZE:
1744 :
1745 : /*
1746 : * Function stream_start_internal starts a transaction. This
1747 : * transaction will be committed on the stream stop unless it is a
1748 : * tablesync worker in which case it will be committed after
1749 : * processing all the messages. We need this transaction for
1750 : * handling the BufFile, used for serializing the streaming data
1751 : * and subxact info.
1752 : */
1753 686 : stream_start_internal(stream_xid, first_segment);
1754 686 : break;
1755 :
1756 478 : case TRANS_LEADER_SEND_TO_PARALLEL:
1757 : Assert(winfo);
1758 :
1759 : /*
1760 : * Once we start serializing the changes, the parallel apply
1761 : * worker will wait for the leader to release the stream lock
1762 : * until the end of the transaction. So, we don't need to release
1763 : * the lock or increment the stream count in that case.
1764 : */
1765 478 : if (pa_send_data(winfo, s->len, s->data))
1766 : {
1767 : /*
1768 : * Unlock the shared object lock so that the parallel apply
1769 : * worker can continue to receive changes.
1770 : */
1771 470 : if (!first_segment)
1772 426 : pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
1773 :
1774 : /*
1775 : * Increment the number of streaming blocks waiting to be
1776 : * processed by parallel apply worker.
1777 : */
1778 470 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
1779 :
1780 : /* Cache the parallel apply worker for this transaction. */
1781 470 : pa_set_stream_apply_worker(winfo);
1782 470 : break;
1783 : }
1784 :
1785 : /*
1786 : * Switch to serialize mode when we are not able to send the
1787 : * change to parallel apply worker.
1788 : */
1789 8 : pa_switch_to_partial_serialize(winfo, !first_segment);
1790 :
1791 : /* fall through */
1792 30 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1793 : Assert(winfo);
1794 :
1795 : /*
1796 : * Open the spool file unless it was already opened when switching
1797 : * to serialize mode. The transaction started in
1798 : * stream_start_internal will be committed on the stream stop.
1799 : */
1800 30 : if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
1801 22 : stream_start_internal(stream_xid, first_segment);
1802 :
1803 30 : stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg);
1804 :
1805 : /* Cache the parallel apply worker for this transaction. */
1806 30 : pa_set_stream_apply_worker(winfo);
1807 30 : break;
1808 :
1809 480 : case TRANS_PARALLEL_APPLY:
1810 480 : if (first_segment)
1811 : {
1812 : /* Hold the lock until the end of the transaction. */
1813 52 : pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1814 52 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED);
1815 :
1816 : /*
1817 : * Signal the leader apply worker, as it may be waiting for
1818 : * us.
1819 : */
1820 52 : logicalrep_worker_wakeup(WORKERTYPE_APPLY,
1821 52 : MyLogicalRepWorker->subid, InvalidOid);
1822 : }
1823 :
1824 480 : parallel_stream_nchanges = 0;
1825 480 : break;
1826 :
1827 0 : default:
1828 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1829 : break;
1830 : }
1831 :
1832 1666 : pgstat_report_activity(STATE_RUNNING, NULL);
1833 1666 : }
1834 :
1835 : /*
1836 : * Update the information about subxacts and close the file.
1837 : *
1838 : * This function should be called when the stream_start_internal function has
1839 : * been called.
1840 : */
1841 : void
1842 726 : stream_stop_internal(TransactionId xid)
1843 : {
1844 : /*
1845 : * Serialize information about subxacts for the toplevel transaction, then
1846 : * close the stream messages spool file.
1847 : */
1848 726 : subxact_info_write(MyLogicalRepWorker->subid, xid);
1849 726 : stream_close_file();
1850 :
1851 : /* We must be in a valid transaction state */
1852 : Assert(IsTransactionState());
1853 :
1854 : /* Commit the per-stream transaction */
1855 726 : CommitTransactionCommand();
1856 :
1857 : /* Reset per-stream context */
1858 726 : MemoryContextReset(LogicalStreamingContext);
1859 726 : }
1860 :
1861 : /*
1862 : * Handle STREAM STOP message.
1863 : */
1864 : static void
1865 1664 : apply_handle_stream_stop(StringInfo s)
1866 : {
1867 : ParallelApplyWorkerInfo *winfo;
1868 : TransApplyAction apply_action;
1869 :
1870 1664 : if (!in_streamed_transaction)
1871 0 : ereport(ERROR,
1872 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1873 : errmsg_internal("STREAM STOP message without STREAM START")));
1874 :
1875 1664 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1876 :
1877 1664 : switch (apply_action)
1878 : {
1879 686 : case TRANS_LEADER_SERIALIZE:
1880 686 : stream_stop_internal(stream_xid);
1881 686 : break;
1882 :
1883 470 : case TRANS_LEADER_SEND_TO_PARALLEL:
1884 : Assert(winfo);
1885 :
1886 : /*
1887 : * Lock before sending the STREAM_STOP message so that the leader
1888 : * can hold the lock first and the parallel apply worker will wait
1889 : * for leader to release the lock. See Locking Considerations atop
1890 : * applyparallelworker.c.
1891 : */
1892 470 : pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
1893 :
1894 470 : if (pa_send_data(winfo, s->len, s->data))
1895 : {
1896 470 : pa_set_stream_apply_worker(NULL);
1897 470 : break;
1898 : }
1899 :
1900 : /*
1901 : * Switch to serialize mode when we are not able to send the
1902 : * change to parallel apply worker.
1903 : */
1904 0 : pa_switch_to_partial_serialize(winfo, true);
1905 :
1906 : /* fall through */
1907 30 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1908 30 : stream_write_change(LOGICAL_REP_MSG_STREAM_STOP, s);
1909 30 : stream_stop_internal(stream_xid);
1910 30 : pa_set_stream_apply_worker(NULL);
1911 30 : break;
1912 :
1913 478 : case TRANS_PARALLEL_APPLY:
1914 478 : elog(DEBUG1, "applied %u changes in the streaming chunk",
1915 : parallel_stream_nchanges);
1916 :
1917 : /*
1918 : * By the time parallel apply worker is processing the changes in
1919 : * the current streaming block, the leader apply worker may have
1920 : * sent multiple streaming blocks. This can lead to parallel apply
1921 : * worker start waiting even when there are more chunk of streams
1922 : * in the queue. So, try to lock only if there is no message left
1923 : * in the queue. See Locking Considerations atop
1924 : * applyparallelworker.c.
1925 : *
1926 : * Note that here we have a race condition where we can start
1927 : * waiting even when there are pending streaming chunks. This can
1928 : * happen if the leader sends another streaming block and acquires
1929 : * the stream lock again after the parallel apply worker checks
1930 : * that there is no pending streaming block and before it actually
1931 : * starts waiting on a lock. We can handle this case by not
1932 : * allowing the leader to increment the stream block count during
1933 : * the time parallel apply worker acquires the lock but it is not
1934 : * clear whether that is worth the complexity.
1935 : *
1936 : * Now, if this missed chunk contains rollback to savepoint, then
1937 : * there is a risk of deadlock which probably shouldn't happen
1938 : * after restart.
1939 : */
1940 478 : pa_decr_and_wait_stream_block();
1941 474 : break;
1942 :
1943 0 : default:
1944 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1945 : break;
1946 : }
1947 :
1948 1660 : in_streamed_transaction = false;
1949 1660 : stream_xid = InvalidTransactionId;
1950 :
1951 : /*
1952 : * The parallel apply worker could be in a transaction in which case we
1953 : * need to report the state as STATE_IDLEINTRANSACTION.
1954 : */
1955 1660 : if (IsTransactionOrTransactionBlock())
1956 474 : pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL);
1957 : else
1958 1186 : pgstat_report_activity(STATE_IDLE, NULL);
1959 :
1960 1660 : reset_apply_error_context_info();
1961 1660 : }
1962 :
1963 : /*
1964 : * Helper function to handle STREAM ABORT message when the transaction was
1965 : * serialized to file.
1966 : */
1967 : static void
1968 28 : stream_abort_internal(TransactionId xid, TransactionId subxid)
1969 : {
1970 : /*
1971 : * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1972 : * just delete the files with serialized info.
1973 : */
1974 28 : if (xid == subxid)
1975 2 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
1976 : else
1977 : {
1978 : /*
1979 : * OK, so it's a subxact. We need to read the subxact file for the
1980 : * toplevel transaction, determine the offset tracked for the subxact,
1981 : * and truncate the file with changes. We also remove the subxacts
1982 : * with higher offsets (or rather higher XIDs).
1983 : *
1984 : * We intentionally scan the array from the tail, because we're likely
1985 : * aborting a change for the most recent subtransactions.
1986 : *
1987 : * We can't use the binary search here as subxact XIDs won't
1988 : * necessarily arrive in sorted order, consider the case where we have
1989 : * released the savepoint for multiple subtransactions and then
1990 : * performed rollback to savepoint for one of the earlier
1991 : * sub-transaction.
1992 : */
1993 : int64 i;
1994 : int64 subidx;
1995 : BufFile *fd;
1996 26 : bool found = false;
1997 : char path[MAXPGPATH];
1998 :
1999 26 : subidx = -1;
2000 26 : begin_replication_step();
2001 26 : subxact_info_read(MyLogicalRepWorker->subid, xid);
2002 :
2003 30 : for (i = subxact_data.nsubxacts; i > 0; i--)
2004 : {
2005 22 : if (subxact_data.subxacts[i - 1].xid == subxid)
2006 : {
2007 18 : subidx = (i - 1);
2008 18 : found = true;
2009 18 : break;
2010 : }
2011 : }
2012 :
2013 : /*
2014 : * If it's an empty sub-transaction then we will not find the subxid
2015 : * here so just cleanup the subxact info and return.
2016 : */
2017 26 : if (!found)
2018 : {
2019 : /* Cleanup the subxact info */
2020 8 : cleanup_subxact_info();
2021 8 : end_replication_step();
2022 8 : CommitTransactionCommand();
2023 8 : return;
2024 : }
2025 :
2026 : /* open the changes file */
2027 18 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2028 18 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path,
2029 : O_RDWR, false);
2030 :
2031 : /* OK, truncate the file at the right offset */
2032 18 : BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
2033 18 : subxact_data.subxacts[subidx].offset);
2034 18 : BufFileClose(fd);
2035 :
2036 : /* discard the subxacts added later */
2037 18 : subxact_data.nsubxacts = subidx;
2038 :
2039 : /* write the updated subxact list */
2040 18 : subxact_info_write(MyLogicalRepWorker->subid, xid);
2041 :
2042 18 : end_replication_step();
2043 18 : CommitTransactionCommand();
2044 : }
2045 : }
2046 :
2047 : /*
2048 : * Handle STREAM ABORT message.
2049 : */
2050 : static void
2051 76 : apply_handle_stream_abort(StringInfo s)
2052 : {
2053 : TransactionId xid;
2054 : TransactionId subxid;
2055 : LogicalRepStreamAbortData abort_data;
2056 : ParallelApplyWorkerInfo *winfo;
2057 : TransApplyAction apply_action;
2058 :
2059 : /* Save the message before it is consumed. */
2060 76 : StringInfoData original_msg = *s;
2061 : bool toplevel_xact;
2062 :
2063 76 : if (in_streamed_transaction)
2064 0 : ereport(ERROR,
2065 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2066 : errmsg_internal("STREAM ABORT message without STREAM STOP")));
2067 :
2068 : /* We receive abort information only when we can apply in parallel. */
2069 76 : logicalrep_read_stream_abort(s, &abort_data,
2070 76 : MyLogicalRepWorker->parallel_apply);
2071 :
2072 76 : xid = abort_data.xid;
2073 76 : subxid = abort_data.subxid;
2074 76 : toplevel_xact = (xid == subxid);
2075 :
2076 76 : set_apply_error_context_xact(subxid, abort_data.abort_lsn);
2077 :
2078 76 : apply_action = get_transaction_apply_action(xid, &winfo);
2079 :
2080 76 : switch (apply_action)
2081 : {
2082 28 : case TRANS_LEADER_APPLY:
2083 :
2084 : /*
2085 : * We are in the leader apply worker and the transaction has been
2086 : * serialized to file.
2087 : */
2088 28 : stream_abort_internal(xid, subxid);
2089 :
2090 28 : elog(DEBUG1, "finished processing the STREAM ABORT command");
2091 28 : break;
2092 :
2093 20 : case TRANS_LEADER_SEND_TO_PARALLEL:
2094 : Assert(winfo);
2095 :
2096 : /*
2097 : * For the case of aborting the subtransaction, we increment the
2098 : * number of streaming blocks and take the lock again before
2099 : * sending the STREAM_ABORT to ensure that the parallel apply
2100 : * worker will wait on the lock for the next set of changes after
2101 : * processing the STREAM_ABORT message if it is not already
2102 : * waiting for STREAM_STOP message.
2103 : *
2104 : * It is important to perform this locking before sending the
2105 : * STREAM_ABORT message so that the leader can hold the lock first
2106 : * and the parallel apply worker will wait for the leader to
2107 : * release the lock. This is the same as what we do in
2108 : * apply_handle_stream_stop. See Locking Considerations atop
2109 : * applyparallelworker.c.
2110 : */
2111 20 : if (!toplevel_xact)
2112 : {
2113 18 : pa_unlock_stream(xid, AccessExclusiveLock);
2114 18 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
2115 18 : pa_lock_stream(xid, AccessExclusiveLock);
2116 : }
2117 :
2118 20 : if (pa_send_data(winfo, s->len, s->data))
2119 : {
2120 : /*
2121 : * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
2122 : * wait here for the parallel apply worker to finish as that
2123 : * is not required to maintain the commit order and won't have
2124 : * the risk of failures due to transaction dependencies and
2125 : * deadlocks. However, it is possible that before the parallel
2126 : * worker finishes and we clear the worker info, the xid
2127 : * wraparound happens on the upstream and a new transaction
2128 : * with the same xid can appear and that can lead to duplicate
2129 : * entries in ParallelApplyTxnHash. Yet another problem could
2130 : * be that we may have serialized the changes in partial
2131 : * serialize mode and the file containing xact changes may
2132 : * already exist, and after xid wraparound trying to create
2133 : * the file for the same xid can lead to an error. To avoid
2134 : * these problems, we decide to wait for the aborts to finish.
2135 : *
2136 : * Note, it is okay to not update the flush location position
2137 : * for aborts as in worst case that means such a transaction
2138 : * won't be sent again after restart.
2139 : */
2140 20 : if (toplevel_xact)
2141 2 : pa_xact_finish(winfo, InvalidXLogRecPtr);
2142 :
2143 20 : break;
2144 : }
2145 :
2146 : /*
2147 : * Switch to serialize mode when we are not able to send the
2148 : * change to parallel apply worker.
2149 : */
2150 0 : pa_switch_to_partial_serialize(winfo, true);
2151 :
2152 : /* fall through */
2153 4 : case TRANS_LEADER_PARTIAL_SERIALIZE:
2154 : Assert(winfo);
2155 :
2156 : /*
2157 : * Parallel apply worker might have applied some changes, so write
2158 : * the STREAM_ABORT message so that it can rollback the
2159 : * subtransaction if needed.
2160 : */
2161 4 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_ABORT,
2162 : &original_msg);
2163 :
2164 4 : if (toplevel_xact)
2165 : {
2166 2 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
2167 2 : pa_xact_finish(winfo, InvalidXLogRecPtr);
2168 : }
2169 4 : break;
2170 :
2171 24 : case TRANS_PARALLEL_APPLY:
2172 :
2173 : /*
2174 : * If the parallel apply worker is applying spooled messages then
2175 : * close the file before aborting.
2176 : */
2177 24 : if (toplevel_xact && stream_fd)
2178 2 : stream_close_file();
2179 :
2180 24 : pa_stream_abort(&abort_data);
2181 :
2182 : /*
2183 : * We need to wait after processing rollback to savepoint for the
2184 : * next set of changes.
2185 : *
2186 : * We have a race condition here due to which we can start waiting
2187 : * here when there are more chunk of streams in the queue. See
2188 : * apply_handle_stream_stop.
2189 : */
2190 24 : if (!toplevel_xact)
2191 20 : pa_decr_and_wait_stream_block();
2192 :
2193 24 : elog(DEBUG1, "finished processing the STREAM ABORT command");
2194 24 : break;
2195 :
2196 0 : default:
2197 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2198 : break;
2199 : }
2200 :
2201 76 : reset_apply_error_context_info();
2202 76 : }
2203 :
2204 : /*
2205 : * Ensure that the passed location is fileset's end.
2206 : */
2207 : static void
2208 8 : ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
2209 : off_t offset)
2210 : {
2211 : char path[MAXPGPATH];
2212 : BufFile *fd;
2213 : int last_fileno;
2214 : off_t last_offset;
2215 :
2216 : Assert(!IsTransactionState());
2217 :
2218 8 : begin_replication_step();
2219 :
2220 8 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2221 :
2222 8 : fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2223 :
2224 8 : BufFileSeek(fd, 0, 0, SEEK_END);
2225 8 : BufFileTell(fd, &last_fileno, &last_offset);
2226 :
2227 8 : BufFileClose(fd);
2228 :
2229 8 : end_replication_step();
2230 :
2231 8 : if (last_fileno != fileno || last_offset != offset)
2232 0 : elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2233 : path);
2234 8 : }
2235 :
2236 : /*
2237 : * Common spoolfile processing.
2238 : */
2239 : void
2240 62 : apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
2241 : XLogRecPtr lsn)
2242 : {
2243 : int nchanges;
2244 : char path[MAXPGPATH];
2245 62 : char *buffer = NULL;
2246 : MemoryContext oldcxt;
2247 : ResourceOwner oldowner;
2248 : int fileno;
2249 : off_t offset;
2250 :
2251 62 : if (!am_parallel_apply_worker())
2252 54 : maybe_start_skipping_changes(lsn);
2253 :
2254 : /* Make sure we have an open transaction */
2255 62 : begin_replication_step();
2256 :
2257 : /*
2258 : * Allocate file handle and memory required to process all the messages in
2259 : * TopTransactionContext to avoid them getting reset after each message is
2260 : * processed.
2261 : */
2262 62 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
2263 :
2264 : /* Open the spool file for the committed/prepared transaction */
2265 62 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2266 62 : elog(DEBUG1, "replaying changes from file \"%s\"", path);
2267 :
2268 : /*
2269 : * Make sure the file is owned by the toplevel transaction so that the
2270 : * file will not be accidentally closed when aborting a subtransaction.
2271 : */
2272 62 : oldowner = CurrentResourceOwner;
2273 62 : CurrentResourceOwner = TopTransactionResourceOwner;
2274 :
2275 62 : stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2276 :
2277 62 : CurrentResourceOwner = oldowner;
2278 :
2279 62 : buffer = palloc(BLCKSZ);
2280 :
2281 62 : MemoryContextSwitchTo(oldcxt);
2282 :
2283 62 : remote_final_lsn = lsn;
2284 :
2285 : /*
2286 : * Make sure the handle apply_dispatch methods are aware we're in a remote
2287 : * transaction.
2288 : */
2289 62 : in_remote_transaction = true;
2290 62 : pgstat_report_activity(STATE_RUNNING, NULL);
2291 :
2292 62 : end_replication_step();
2293 :
2294 : /*
2295 : * Read the entries one by one and pass them through the same logic as in
2296 : * apply_dispatch.
2297 : */
2298 62 : nchanges = 0;
2299 : while (true)
2300 176940 : {
2301 : StringInfoData s2;
2302 : size_t nbytes;
2303 : int len;
2304 :
2305 177002 : CHECK_FOR_INTERRUPTS();
2306 :
2307 : /* read length of the on-disk record */
2308 177002 : nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2309 :
2310 : /* have we reached end of the file? */
2311 177002 : if (nbytes == 0)
2312 52 : break;
2313 :
2314 : /* do we have a correct length? */
2315 176950 : if (len <= 0)
2316 0 : elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2317 : len, path);
2318 :
2319 : /* make sure we have sufficiently large buffer */
2320 176950 : buffer = repalloc(buffer, len);
2321 :
2322 : /* and finally read the data into the buffer */
2323 176950 : BufFileReadExact(stream_fd, buffer, len);
2324 :
2325 176950 : BufFileTell(stream_fd, &fileno, &offset);
2326 :
2327 : /* init a stringinfo using the buffer and call apply_dispatch */
2328 176950 : initReadOnlyStringInfo(&s2, buffer, len);
2329 :
2330 : /* Ensure we are reading the data into our memory context. */
2331 176950 : oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
2332 :
2333 176950 : apply_dispatch(&s2);
2334 :
2335 176948 : MemoryContextReset(ApplyMessageContext);
2336 :
2337 176948 : MemoryContextSwitchTo(oldcxt);
2338 :
2339 176948 : nchanges++;
2340 :
2341 : /*
2342 : * It is possible the file has been closed because we have processed
2343 : * the transaction end message like stream_commit in which case that
2344 : * must be the last message.
2345 : */
2346 176948 : if (!stream_fd)
2347 : {
2348 8 : ensure_last_message(stream_fileset, xid, fileno, offset);
2349 8 : break;
2350 : }
2351 :
2352 176940 : if (nchanges % 1000 == 0)
2353 168 : elog(DEBUG1, "replayed %d changes from file \"%s\"",
2354 : nchanges, path);
2355 : }
2356 :
2357 60 : if (stream_fd)
2358 52 : stream_close_file();
2359 :
2360 60 : elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2361 : nchanges, path);
2362 :
2363 60 : return;
2364 : }
2365 :
2366 : /*
2367 : * Handle STREAM COMMIT message.
2368 : */
2369 : static void
2370 118 : apply_handle_stream_commit(StringInfo s)
2371 : {
2372 : TransactionId xid;
2373 : LogicalRepCommitData commit_data;
2374 : ParallelApplyWorkerInfo *winfo;
2375 : TransApplyAction apply_action;
2376 :
2377 : /* Save the message before it is consumed. */
2378 118 : StringInfoData original_msg = *s;
2379 :
2380 118 : if (in_streamed_transaction)
2381 0 : ereport(ERROR,
2382 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2383 : errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2384 :
2385 118 : xid = logicalrep_read_stream_commit(s, &commit_data);
2386 118 : set_apply_error_context_xact(xid, commit_data.commit_lsn);
2387 :
2388 118 : apply_action = get_transaction_apply_action(xid, &winfo);
2389 :
2390 118 : switch (apply_action)
2391 : {
2392 44 : case TRANS_LEADER_APPLY:
2393 :
2394 : /*
2395 : * The transaction has been serialized to file, so replay all the
2396 : * spooled operations.
2397 : */
2398 44 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid,
2399 : commit_data.commit_lsn);
2400 :
2401 42 : apply_handle_commit_internal(&commit_data);
2402 :
2403 : /* Unlink the files with serialized changes and subxact info. */
2404 42 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
2405 :
2406 42 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2407 42 : break;
2408 :
2409 34 : case TRANS_LEADER_SEND_TO_PARALLEL:
2410 : Assert(winfo);
2411 :
2412 34 : if (pa_send_data(winfo, s->len, s->data))
2413 : {
2414 : /* Finish processing the streaming transaction. */
2415 34 : pa_xact_finish(winfo, commit_data.end_lsn);
2416 32 : break;
2417 : }
2418 :
2419 : /*
2420 : * Switch to serialize mode when we are not able to send the
2421 : * change to parallel apply worker.
2422 : */
2423 0 : pa_switch_to_partial_serialize(winfo, true);
2424 :
2425 : /* fall through */
2426 4 : case TRANS_LEADER_PARTIAL_SERIALIZE:
2427 : Assert(winfo);
2428 :
2429 4 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_COMMIT,
2430 : &original_msg);
2431 :
2432 4 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
2433 :
2434 : /* Finish processing the streaming transaction. */
2435 4 : pa_xact_finish(winfo, commit_data.end_lsn);
2436 4 : break;
2437 :
2438 36 : case TRANS_PARALLEL_APPLY:
2439 :
2440 : /*
2441 : * If the parallel apply worker is applying spooled messages then
2442 : * close the file before committing.
2443 : */
2444 36 : if (stream_fd)
2445 4 : stream_close_file();
2446 :
2447 36 : apply_handle_commit_internal(&commit_data);
2448 :
2449 36 : MyParallelShared->last_commit_end = XactLastCommitEnd;
2450 :
2451 : /*
2452 : * It is important to set the transaction state as finished before
2453 : * releasing the lock. See pa_wait_for_xact_finish.
2454 : */
2455 36 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
2456 36 : pa_unlock_transaction(xid, AccessExclusiveLock);
2457 :
2458 36 : pa_reset_subtrans();
2459 :
2460 36 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2461 36 : break;
2462 :
2463 0 : default:
2464 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2465 : break;
2466 : }
2467 :
2468 : /* Process any tables that are being synchronized in parallel. */
2469 114 : ProcessSyncingRelations(commit_data.end_lsn);
2470 :
2471 114 : pgstat_report_activity(STATE_IDLE, NULL);
2472 :
2473 114 : reset_apply_error_context_info();
2474 114 : }
2475 :
2476 : /*
2477 : * Helper function for apply_handle_commit and apply_handle_stream_commit.
2478 : */
2479 : static void
2480 954 : apply_handle_commit_internal(LogicalRepCommitData *commit_data)
2481 : {
2482 954 : if (is_skipping_changes())
2483 : {
2484 4 : stop_skipping_changes();
2485 :
2486 : /*
2487 : * Start a new transaction to clear the subskiplsn, if not started
2488 : * yet.
2489 : */
2490 4 : if (!IsTransactionState())
2491 2 : StartTransactionCommand();
2492 : }
2493 :
2494 954 : if (IsTransactionState())
2495 : {
2496 : /*
2497 : * The transaction is either non-empty or skipped, so we clear the
2498 : * subskiplsn.
2499 : */
2500 954 : clear_subscription_skip_lsn(commit_data->commit_lsn);
2501 :
2502 : /*
2503 : * Update origin state so we can restart streaming from correct
2504 : * position in case of crash.
2505 : */
2506 954 : replorigin_session_origin_lsn = commit_data->end_lsn;
2507 954 : replorigin_session_origin_timestamp = commit_data->committime;
2508 :
2509 954 : CommitTransactionCommand();
2510 :
2511 954 : if (IsTransactionBlock())
2512 : {
2513 8 : EndTransactionBlock(false);
2514 8 : CommitTransactionCommand();
2515 : }
2516 :
2517 954 : pgstat_report_stat(false);
2518 :
2519 954 : store_flush_position(commit_data->end_lsn, XactLastCommitEnd);
2520 : }
2521 : else
2522 : {
2523 : /* Process any invalidation messages that might have accumulated. */
2524 0 : AcceptInvalidationMessages();
2525 0 : maybe_reread_subscription();
2526 : }
2527 :
2528 954 : in_remote_transaction = false;
2529 954 : }
2530 :
2531 : /*
2532 : * Handle RELATION message.
2533 : *
2534 : * Note we don't do validation against local schema here. The validation
2535 : * against local schema is postponed until first change for given relation
2536 : * comes as we only care about it when applying changes for it anyway and we
2537 : * do less locking this way.
2538 : */
2539 : static void
2540 918 : apply_handle_relation(StringInfo s)
2541 : {
2542 : LogicalRepRelation *rel;
2543 :
2544 918 : if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
2545 72 : return;
2546 :
2547 846 : rel = logicalrep_read_rel(s);
2548 846 : logicalrep_relmap_update(rel);
2549 :
2550 : /* Also reset all entries in the partition map that refer to remoterel. */
2551 846 : logicalrep_partmap_reset_relmap(rel);
2552 : }
2553 :
2554 : /*
2555 : * Handle TYPE message.
2556 : *
2557 : * This implementation pays no attention to TYPE messages; we expect the user
2558 : * to have set things up so that the incoming data is acceptable to the input
2559 : * functions for the locally subscribed tables. Hence, we just read and
2560 : * discard the message.
2561 : */
2562 : static void
2563 36 : apply_handle_type(StringInfo s)
2564 : {
2565 : LogicalRepTyp typ;
2566 :
2567 36 : if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s))
2568 0 : return;
2569 :
2570 36 : logicalrep_read_typ(s, &typ);
2571 : }
2572 :
2573 : /*
2574 : * Check that we (the subscription owner) have sufficient privileges on the
2575 : * target relation to perform the given operation.
2576 : */
2577 : static void
2578 441212 : TargetPrivilegesCheck(Relation rel, AclMode mode)
2579 : {
2580 : Oid relid;
2581 : AclResult aclresult;
2582 :
2583 441212 : relid = RelationGetRelid(rel);
2584 441212 : aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
2585 441212 : if (aclresult != ACLCHECK_OK)
2586 14 : aclcheck_error(aclresult,
2587 14 : get_relkind_objtype(rel->rd_rel->relkind),
2588 14 : get_rel_name(relid));
2589 :
2590 : /*
2591 : * We lack the infrastructure to honor RLS policies. It might be possible
2592 : * to add such infrastructure here, but tablesync workers lack it, too, so
2593 : * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2594 : * but it seems dangerous to replicate a TRUNCATE and then refuse to
2595 : * replicate subsequent INSERTs, so we forbid all commands the same.
2596 : */
2597 441198 : if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2598 6 : ereport(ERROR,
2599 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2600 : errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2601 : GetUserNameFromId(GetUserId(), true),
2602 : RelationGetRelationName(rel))));
2603 441192 : }
2604 :
2605 : /*
2606 : * Handle INSERT message.
2607 : */
2608 :
2609 : static void
2610 362276 : apply_handle_insert(StringInfo s)
2611 : {
2612 : LogicalRepRelMapEntry *rel;
2613 : LogicalRepTupleData newtup;
2614 : LogicalRepRelId relid;
2615 : UserContext ucxt;
2616 : ApplyExecutionData *edata;
2617 : EState *estate;
2618 : TupleTableSlot *remoteslot;
2619 : MemoryContext oldctx;
2620 : bool run_as_owner;
2621 :
2622 : /*
2623 : * Quick return if we are skipping data modification changes or handling
2624 : * streamed transactions.
2625 : */
2626 704550 : if (is_skipping_changes() ||
2627 342274 : handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
2628 210112 : return;
2629 :
2630 152262 : begin_replication_step();
2631 :
2632 152258 : relid = logicalrep_read_insert(s, &newtup);
2633 152258 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2634 152244 : if (!should_apply_changes_for_rel(rel))
2635 : {
2636 : /*
2637 : * The relation can't become interesting in the middle of the
2638 : * transaction so it's safe to unlock it.
2639 : */
2640 98 : logicalrep_rel_close(rel, RowExclusiveLock);
2641 98 : end_replication_step();
2642 98 : return;
2643 : }
2644 :
2645 : /*
2646 : * Make sure that any user-supplied code runs as the table owner, unless
2647 : * the user has opted out of that behavior.
2648 : */
2649 152146 : run_as_owner = MySubscription->runasowner;
2650 152146 : if (!run_as_owner)
2651 152130 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2652 :
2653 : /* Set relation for error callback */
2654 152146 : apply_error_callback_arg.rel = rel;
2655 :
2656 : /* Initialize the executor state. */
2657 152146 : edata = create_edata_for_relation(rel);
2658 152146 : estate = edata->estate;
2659 152146 : remoteslot = ExecInitExtraTupleSlot(estate,
2660 152146 : RelationGetDescr(rel->localrel),
2661 : &TTSOpsVirtual);
2662 :
2663 : /* Process and store remote tuple in the slot */
2664 152146 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2665 152146 : slot_store_data(remoteslot, rel, &newtup);
2666 152146 : slot_fill_defaults(rel, estate, remoteslot);
2667 152146 : MemoryContextSwitchTo(oldctx);
2668 :
2669 : /* For a partitioned table, insert the tuple into a partition. */
2670 152146 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2671 110 : apply_handle_tuple_routing(edata,
2672 : remoteslot, NULL, CMD_INSERT);
2673 : else
2674 : {
2675 152036 : ResultRelInfo *relinfo = edata->targetRelInfo;
2676 :
2677 152036 : ExecOpenIndices(relinfo, false);
2678 152036 : apply_handle_insert_internal(edata, relinfo, remoteslot);
2679 152006 : ExecCloseIndices(relinfo);
2680 : }
2681 :
2682 152094 : finish_edata(edata);
2683 :
2684 : /* Reset relation for error callback */
2685 152094 : apply_error_callback_arg.rel = NULL;
2686 :
2687 152094 : if (!run_as_owner)
2688 152084 : RestoreUserContext(&ucxt);
2689 :
2690 152094 : logicalrep_rel_close(rel, NoLock);
2691 :
2692 152094 : end_replication_step();
2693 : }
2694 :
2695 : /*
2696 : * Workhorse for apply_handle_insert()
2697 : * relinfo is for the relation we're actually inserting into
2698 : * (could be a child partition of edata->targetRelInfo)
2699 : */
2700 : static void
2701 152148 : apply_handle_insert_internal(ApplyExecutionData *edata,
2702 : ResultRelInfo *relinfo,
2703 : TupleTableSlot *remoteslot)
2704 : {
2705 152148 : EState *estate = edata->estate;
2706 :
2707 : /* Caller should have opened indexes already. */
2708 : Assert(relinfo->ri_IndexRelationDescs != NULL ||
2709 : !relinfo->ri_RelationDesc->rd_rel->relhasindex ||
2710 : RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
2711 :
2712 : /* Caller will not have done this bit. */
2713 : Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
2714 152148 : InitConflictIndexes(relinfo);
2715 :
2716 : /* Do the insert. */
2717 152148 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
2718 152136 : ExecSimpleRelationInsert(relinfo, estate, remoteslot);
2719 152096 : }
2720 :
2721 : /*
2722 : * Check if the logical replication relation is updatable and throw
2723 : * appropriate error if it isn't.
2724 : */
2725 : static void
2726 144582 : check_relation_updatable(LogicalRepRelMapEntry *rel)
2727 : {
2728 : /*
2729 : * For partitioned tables, we only need to care if the target partition is
2730 : * updatable (aka has PK or RI defined for it).
2731 : */
2732 144582 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2733 60 : return;
2734 :
2735 : /* Updatable, no error. */
2736 144522 : if (rel->updatable)
2737 144522 : return;
2738 :
2739 : /*
2740 : * We are in error mode so it's fine this is somewhat slow. It's better to
2741 : * give user correct error.
2742 : */
2743 0 : if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
2744 : {
2745 0 : ereport(ERROR,
2746 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2747 : errmsg("publisher did not send replica identity column "
2748 : "expected by the logical replication target relation \"%s.%s\"",
2749 : rel->remoterel.nspname, rel->remoterel.relname)));
2750 : }
2751 :
2752 0 : ereport(ERROR,
2753 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2754 : errmsg("logical replication target relation \"%s.%s\" has "
2755 : "neither REPLICA IDENTITY index nor PRIMARY "
2756 : "KEY and published relation does not have "
2757 : "REPLICA IDENTITY FULL",
2758 : rel->remoterel.nspname, rel->remoterel.relname)));
2759 : }
2760 :
2761 : /*
2762 : * Handle UPDATE message.
2763 : *
2764 : * TODO: FDW support
2765 : */
2766 : static void
2767 132328 : apply_handle_update(StringInfo s)
2768 : {
2769 : LogicalRepRelMapEntry *rel;
2770 : LogicalRepRelId relid;
2771 : UserContext ucxt;
2772 : ApplyExecutionData *edata;
2773 : EState *estate;
2774 : LogicalRepTupleData oldtup;
2775 : LogicalRepTupleData newtup;
2776 : bool has_oldtup;
2777 : TupleTableSlot *remoteslot;
2778 : RTEPermissionInfo *target_perminfo;
2779 : MemoryContext oldctx;
2780 : bool run_as_owner;
2781 :
2782 : /*
2783 : * Quick return if we are skipping data modification changes or handling
2784 : * streamed transactions.
2785 : */
2786 264650 : if (is_skipping_changes() ||
2787 132322 : handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
2788 68446 : return;
2789 :
2790 63882 : begin_replication_step();
2791 :
2792 63880 : relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
2793 : &newtup);
2794 63880 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2795 63880 : if (!should_apply_changes_for_rel(rel))
2796 : {
2797 : /*
2798 : * The relation can't become interesting in the middle of the
2799 : * transaction so it's safe to unlock it.
2800 : */
2801 0 : logicalrep_rel_close(rel, RowExclusiveLock);
2802 0 : end_replication_step();
2803 0 : return;
2804 : }
2805 :
2806 : /* Set relation for error callback */
2807 63880 : apply_error_callback_arg.rel = rel;
2808 :
2809 : /* Check if we can do the update. */
2810 63880 : check_relation_updatable(rel);
2811 :
2812 : /*
2813 : * Make sure that any user-supplied code runs as the table owner, unless
2814 : * the user has opted out of that behavior.
2815 : */
2816 63880 : run_as_owner = MySubscription->runasowner;
2817 63880 : if (!run_as_owner)
2818 63874 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2819 :
2820 : /* Initialize the executor state. */
2821 63878 : edata = create_edata_for_relation(rel);
2822 63878 : estate = edata->estate;
2823 63878 : remoteslot = ExecInitExtraTupleSlot(estate,
2824 63878 : RelationGetDescr(rel->localrel),
2825 : &TTSOpsVirtual);
2826 :
2827 : /*
2828 : * Populate updatedCols so that per-column triggers can fire, and so
2829 : * executor can correctly pass down indexUnchanged hint. This could
2830 : * include more columns than were actually changed on the publisher
2831 : * because the logical replication protocol doesn't contain that
2832 : * information. But it would for example exclude columns that only exist
2833 : * on the subscriber, since we are not touching those.
2834 : */
2835 63878 : target_perminfo = list_nth(estate->es_rteperminfos, 0);
2836 318642 : for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2837 : {
2838 254764 : CompactAttribute *att = TupleDescCompactAttr(remoteslot->tts_tupleDescriptor, i);
2839 254764 : int remoteattnum = rel->attrmap->attnums[i];
2840 :
2841 254764 : if (!att->attisdropped && remoteattnum >= 0)
2842 : {
2843 : Assert(remoteattnum < newtup.ncols);
2844 137730 : if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
2845 137724 : target_perminfo->updatedCols =
2846 137724 : bms_add_member(target_perminfo->updatedCols,
2847 : i + 1 - FirstLowInvalidHeapAttributeNumber);
2848 : }
2849 : }
2850 :
2851 : /* Build the search tuple. */
2852 63878 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2853 63878 : slot_store_data(remoteslot, rel,
2854 63878 : has_oldtup ? &oldtup : &newtup);
2855 63878 : MemoryContextSwitchTo(oldctx);
2856 :
2857 : /* For a partitioned table, apply update to correct partition. */
2858 63878 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2859 26 : apply_handle_tuple_routing(edata,
2860 : remoteslot, &newtup, CMD_UPDATE);
2861 : else
2862 63852 : apply_handle_update_internal(edata, edata->targetRelInfo,
2863 : remoteslot, &newtup, rel->localindexoid);
2864 :
2865 63866 : finish_edata(edata);
2866 :
2867 : /* Reset relation for error callback */
2868 63866 : apply_error_callback_arg.rel = NULL;
2869 :
2870 63866 : if (!run_as_owner)
2871 63862 : RestoreUserContext(&ucxt);
2872 :
2873 63866 : logicalrep_rel_close(rel, NoLock);
2874 :
2875 63866 : end_replication_step();
2876 : }
2877 :
2878 : /*
2879 : * Workhorse for apply_handle_update()
2880 : * relinfo is for the relation we're actually updating in
2881 : * (could be a child partition of edata->targetRelInfo)
2882 : */
2883 : static void
2884 63852 : apply_handle_update_internal(ApplyExecutionData *edata,
2885 : ResultRelInfo *relinfo,
2886 : TupleTableSlot *remoteslot,
2887 : LogicalRepTupleData *newtup,
2888 : Oid localindexoid)
2889 : {
2890 63852 : EState *estate = edata->estate;
2891 63852 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2892 63852 : Relation localrel = relinfo->ri_RelationDesc;
2893 : EPQState epqstate;
2894 63852 : TupleTableSlot *localslot = NULL;
2895 63852 : ConflictTupleInfo conflicttuple = {0};
2896 : bool found;
2897 : MemoryContext oldctx;
2898 :
2899 63852 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2900 63852 : ExecOpenIndices(relinfo, false);
2901 :
2902 63852 : found = FindReplTupleInLocalRel(edata, localrel,
2903 : &relmapentry->remoterel,
2904 : localindexoid,
2905 : remoteslot, &localslot);
2906 :
2907 : /*
2908 : * Tuple found.
2909 : *
2910 : * Note this will fail if there are other conflicting unique indexes.
2911 : */
2912 63844 : if (found)
2913 : {
2914 : /*
2915 : * Report the conflict if the tuple was modified by a different
2916 : * origin.
2917 : */
2918 63826 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
2919 4 : &conflicttuple.origin, &conflicttuple.ts) &&
2920 4 : conflicttuple.origin != replorigin_session_origin)
2921 : {
2922 : TupleTableSlot *newslot;
2923 :
2924 : /* Store the new tuple for conflict reporting */
2925 4 : newslot = table_slot_create(localrel, &estate->es_tupleTable);
2926 4 : slot_store_data(newslot, relmapentry, newtup);
2927 :
2928 4 : conflicttuple.slot = localslot;
2929 :
2930 4 : ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
2931 : remoteslot, newslot,
2932 4 : list_make1(&conflicttuple));
2933 : }
2934 :
2935 : /* Process and store remote tuple in the slot */
2936 63826 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2937 63826 : slot_modify_data(remoteslot, localslot, relmapentry, newtup);
2938 63826 : MemoryContextSwitchTo(oldctx);
2939 :
2940 63826 : EvalPlanQualSetSlot(&epqstate, remoteslot);
2941 :
2942 63826 : InitConflictIndexes(relinfo);
2943 :
2944 : /* Do the actual update. */
2945 63826 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
2946 63826 : ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2947 : remoteslot);
2948 : }
2949 : else
2950 : {
2951 : ConflictType type;
2952 18 : TupleTableSlot *newslot = localslot;
2953 :
2954 : /*
2955 : * Detecting whether the tuple was recently deleted or never existed
2956 : * is crucial to avoid misleading the user during conflict handling.
2957 : */
2958 18 : if (FindDeletedTupleInLocalRel(localrel, localindexoid, remoteslot,
2959 : &conflicttuple.xmin,
2960 : &conflicttuple.origin,
2961 6 : &conflicttuple.ts) &&
2962 6 : conflicttuple.origin != replorigin_session_origin)
2963 6 : type = CT_UPDATE_DELETED;
2964 : else
2965 12 : type = CT_UPDATE_MISSING;
2966 :
2967 : /* Store the new tuple for conflict reporting */
2968 18 : slot_store_data(newslot, relmapentry, newtup);
2969 :
2970 : /*
2971 : * The tuple to be updated could not be found or was deleted. Do
2972 : * nothing except for emitting a log message.
2973 : */
2974 18 : ReportApplyConflict(estate, relinfo, LOG, type, remoteslot, newslot,
2975 18 : list_make1(&conflicttuple));
2976 : }
2977 :
2978 : /* Cleanup. */
2979 63840 : ExecCloseIndices(relinfo);
2980 63840 : EvalPlanQualEnd(&epqstate);
2981 63840 : }
2982 :
2983 : /*
2984 : * Handle DELETE message.
2985 : *
2986 : * TODO: FDW support
2987 : */
2988 : static void
2989 163872 : apply_handle_delete(StringInfo s)
2990 : {
2991 : LogicalRepRelMapEntry *rel;
2992 : LogicalRepTupleData oldtup;
2993 : LogicalRepRelId relid;
2994 : UserContext ucxt;
2995 : ApplyExecutionData *edata;
2996 : EState *estate;
2997 : TupleTableSlot *remoteslot;
2998 : MemoryContext oldctx;
2999 : bool run_as_owner;
3000 :
3001 : /*
3002 : * Quick return if we are skipping data modification changes or handling
3003 : * streamed transactions.
3004 : */
3005 327744 : if (is_skipping_changes() ||
3006 163872 : handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
3007 83230 : return;
3008 :
3009 80642 : begin_replication_step();
3010 :
3011 80642 : relid = logicalrep_read_delete(s, &oldtup);
3012 80642 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
3013 80642 : if (!should_apply_changes_for_rel(rel))
3014 : {
3015 : /*
3016 : * The relation can't become interesting in the middle of the
3017 : * transaction so it's safe to unlock it.
3018 : */
3019 0 : logicalrep_rel_close(rel, RowExclusiveLock);
3020 0 : end_replication_step();
3021 0 : return;
3022 : }
3023 :
3024 : /* Set relation for error callback */
3025 80642 : apply_error_callback_arg.rel = rel;
3026 :
3027 : /* Check if we can do the delete. */
3028 80642 : check_relation_updatable(rel);
3029 :
3030 : /*
3031 : * Make sure that any user-supplied code runs as the table owner, unless
3032 : * the user has opted out of that behavior.
3033 : */
3034 80642 : run_as_owner = MySubscription->runasowner;
3035 80642 : if (!run_as_owner)
3036 80638 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
3037 :
3038 : /* Initialize the executor state. */
3039 80642 : edata = create_edata_for_relation(rel);
3040 80642 : estate = edata->estate;
3041 80642 : remoteslot = ExecInitExtraTupleSlot(estate,
3042 80642 : RelationGetDescr(rel->localrel),
3043 : &TTSOpsVirtual);
3044 :
3045 : /* Build the search tuple. */
3046 80642 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3047 80642 : slot_store_data(remoteslot, rel, &oldtup);
3048 80642 : MemoryContextSwitchTo(oldctx);
3049 :
3050 : /* For a partitioned table, apply delete to correct partition. */
3051 80642 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3052 34 : apply_handle_tuple_routing(edata,
3053 : remoteslot, NULL, CMD_DELETE);
3054 : else
3055 : {
3056 80608 : ResultRelInfo *relinfo = edata->targetRelInfo;
3057 :
3058 80608 : ExecOpenIndices(relinfo, false);
3059 80608 : apply_handle_delete_internal(edata, relinfo,
3060 : remoteslot, rel->localindexoid);
3061 80608 : ExecCloseIndices(relinfo);
3062 : }
3063 :
3064 80642 : finish_edata(edata);
3065 :
3066 : /* Reset relation for error callback */
3067 80642 : apply_error_callback_arg.rel = NULL;
3068 :
3069 80642 : if (!run_as_owner)
3070 80638 : RestoreUserContext(&ucxt);
3071 :
3072 80642 : logicalrep_rel_close(rel, NoLock);
3073 :
3074 80642 : end_replication_step();
3075 : }
3076 :
3077 : /*
3078 : * Workhorse for apply_handle_delete()
3079 : * relinfo is for the relation we're actually deleting from
3080 : * (could be a child partition of edata->targetRelInfo)
3081 : */
3082 : static void
3083 80642 : apply_handle_delete_internal(ApplyExecutionData *edata,
3084 : ResultRelInfo *relinfo,
3085 : TupleTableSlot *remoteslot,
3086 : Oid localindexoid)
3087 : {
3088 80642 : EState *estate = edata->estate;
3089 80642 : Relation localrel = relinfo->ri_RelationDesc;
3090 80642 : LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
3091 : EPQState epqstate;
3092 : TupleTableSlot *localslot;
3093 80642 : ConflictTupleInfo conflicttuple = {0};
3094 : bool found;
3095 :
3096 80642 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3097 :
3098 : /* Caller should have opened indexes already. */
3099 : Assert(relinfo->ri_IndexRelationDescs != NULL ||
3100 : !localrel->rd_rel->relhasindex ||
3101 : RelationGetIndexList(localrel) == NIL);
3102 :
3103 80642 : found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
3104 : remoteslot, &localslot);
3105 :
3106 : /* If found delete it. */
3107 80642 : if (found)
3108 : {
3109 : /*
3110 : * Report the conflict if the tuple was modified by a different
3111 : * origin.
3112 : */
3113 80624 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
3114 12 : &conflicttuple.origin, &conflicttuple.ts) &&
3115 12 : conflicttuple.origin != replorigin_session_origin)
3116 : {
3117 10 : conflicttuple.slot = localslot;
3118 10 : ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
3119 : remoteslot, NULL,
3120 10 : list_make1(&conflicttuple));
3121 : }
3122 :
3123 80624 : EvalPlanQualSetSlot(&epqstate, localslot);
3124 :
3125 : /* Do the actual delete. */
3126 80624 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_DELETE);
3127 80624 : ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
3128 : }
3129 : else
3130 : {
3131 : /*
3132 : * The tuple to be deleted could not be found. Do nothing except for
3133 : * emitting a log message.
3134 : */
3135 18 : ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
3136 18 : remoteslot, NULL, list_make1(&conflicttuple));
3137 : }
3138 :
3139 : /* Cleanup. */
3140 80642 : EvalPlanQualEnd(&epqstate);
3141 80642 : }
3142 :
3143 : /*
3144 : * Try to find a tuple received from the publication side (in 'remoteslot') in
3145 : * the corresponding local relation using either replica identity index,
3146 : * primary key, index or if needed, sequential scan.
3147 : *
3148 : * Local tuple, if found, is returned in '*localslot'.
3149 : */
3150 : static bool
3151 144520 : FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
3152 : LogicalRepRelation *remoterel,
3153 : Oid localidxoid,
3154 : TupleTableSlot *remoteslot,
3155 : TupleTableSlot **localslot)
3156 : {
3157 144520 : EState *estate = edata->estate;
3158 : bool found;
3159 :
3160 : /*
3161 : * Regardless of the top-level operation, we're performing a read here, so
3162 : * check for SELECT privileges.
3163 : */
3164 144520 : TargetPrivilegesCheck(localrel, ACL_SELECT);
3165 :
3166 144512 : *localslot = table_slot_create(localrel, &estate->es_tupleTable);
3167 :
3168 : Assert(OidIsValid(localidxoid) ||
3169 : (remoterel->replident == REPLICA_IDENTITY_FULL));
3170 :
3171 144512 : if (OidIsValid(localidxoid))
3172 : {
3173 : #ifdef USE_ASSERT_CHECKING
3174 : Relation idxrel = index_open(localidxoid, AccessShareLock);
3175 :
3176 : /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
3177 : Assert(GetRelationIdentityOrPK(localrel) == localidxoid ||
3178 : (remoterel->replident == REPLICA_IDENTITY_FULL &&
3179 : IsIndexUsableForReplicaIdentityFull(idxrel,
3180 : edata->targetRel->attrmap)));
3181 : index_close(idxrel, AccessShareLock);
3182 : #endif
3183 :
3184 144210 : found = RelationFindReplTupleByIndex(localrel, localidxoid,
3185 : LockTupleExclusive,
3186 : remoteslot, *localslot);
3187 : }
3188 : else
3189 302 : found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
3190 : remoteslot, *localslot);
3191 :
3192 144512 : return found;
3193 : }
3194 :
3195 : /*
3196 : * Determine whether the index can reliably locate the deleted tuple in the
3197 : * local relation.
3198 : *
3199 : * An index may exclude deleted tuples if it was re-indexed or re-created during
3200 : * change application. Therefore, an index is considered usable only if the
3201 : * conflict detection slot.xmin (conflict_detection_xmin) is greater than the
3202 : * index tuple's xmin. This ensures that any tuples deleted prior to the index
3203 : * creation or re-indexing are not relevant for conflict detection in the
3204 : * current apply worker.
3205 : *
3206 : * Note that indexes may also be excluded if they were modified by other DDL
3207 : * operations, such as ALTER INDEX. However, this is acceptable, as the
3208 : * likelihood of such DDL changes coinciding with the need to scan dead
3209 : * tuples for the update_deleted is low.
3210 : */
3211 : static bool
3212 2 : IsIndexUsableForFindingDeletedTuple(Oid localindexoid,
3213 : TransactionId conflict_detection_xmin)
3214 : {
3215 : HeapTuple index_tuple;
3216 : TransactionId index_xmin;
3217 :
3218 2 : index_tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(localindexoid));
3219 :
3220 2 : if (!HeapTupleIsValid(index_tuple)) /* should not happen */
3221 0 : elog(ERROR, "cache lookup failed for index %u", localindexoid);
3222 :
3223 : /*
3224 : * No need to check for a frozen transaction ID, as
3225 : * TransactionIdPrecedes() manages it internally, treating it as falling
3226 : * behind the conflict_detection_xmin.
3227 : */
3228 2 : index_xmin = HeapTupleHeaderGetXmin(index_tuple->t_data);
3229 :
3230 2 : ReleaseSysCache(index_tuple);
3231 :
3232 2 : return TransactionIdPrecedes(index_xmin, conflict_detection_xmin);
3233 : }
3234 :
3235 : /*
3236 : * Attempts to locate a deleted tuple in the local relation that matches the
3237 : * values of the tuple received from the publication side (in 'remoteslot').
3238 : * The search is performed using either the replica identity index, primary
3239 : * key, other available index, or a sequential scan if necessary.
3240 : *
3241 : * Returns true if the deleted tuple is found. If found, the transaction ID,
3242 : * origin, and commit timestamp of the deletion are stored in '*delete_xid',
3243 : * '*delete_origin', and '*delete_time' respectively.
3244 : */
3245 : static bool
3246 22 : FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid,
3247 : TupleTableSlot *remoteslot,
3248 : TransactionId *delete_xid, RepOriginId *delete_origin,
3249 : TimestampTz *delete_time)
3250 : {
3251 : TransactionId oldestxmin;
3252 :
3253 : /*
3254 : * Return false if either dead tuples are not retained or commit timestamp
3255 : * data is not available.
3256 : */
3257 22 : if (!MySubscription->retaindeadtuples || !track_commit_timestamp)
3258 16 : return false;
3259 :
3260 : /*
3261 : * For conflict detection, we use the leader worker's
3262 : * oldest_nonremovable_xid value instead of invoking
3263 : * GetOldestNonRemovableTransactionId() or using the conflict detection
3264 : * slot's xmin. The oldest_nonremovable_xid acts as a threshold to
3265 : * identify tuples that were recently deleted. These deleted tuples are no
3266 : * longer visible to concurrent transactions. However, if a remote update
3267 : * matches such a tuple, we log an update_deleted conflict.
3268 : *
3269 : * While GetOldestNonRemovableTransactionId() and slot.xmin may return
3270 : * transaction IDs older than oldest_nonremovable_xid, for our current
3271 : * purpose, it is acceptable to treat tuples deleted by transactions prior
3272 : * to oldest_nonremovable_xid as update_missing conflicts.
3273 : */
3274 6 : if (am_leader_apply_worker())
3275 : {
3276 6 : oldestxmin = MyLogicalRepWorker->oldest_nonremovable_xid;
3277 : }
3278 : else
3279 : {
3280 : LogicalRepWorker *leader;
3281 :
3282 : /*
3283 : * Obtain the information from the leader apply worker as only the
3284 : * leader manages oldest_nonremovable_xid (see
3285 : * maybe_advance_nonremovable_xid() for details).
3286 : */
3287 0 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
3288 0 : leader = logicalrep_worker_find(WORKERTYPE_APPLY,
3289 0 : MyLogicalRepWorker->subid, InvalidOid,
3290 : false);
3291 0 : if (!leader)
3292 : {
3293 0 : ereport(ERROR,
3294 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3295 : errmsg("could not detect conflict as the leader apply worker has exited")));
3296 : }
3297 :
3298 0 : SpinLockAcquire(&leader->relmutex);
3299 0 : oldestxmin = leader->oldest_nonremovable_xid;
3300 0 : SpinLockRelease(&leader->relmutex);
3301 0 : LWLockRelease(LogicalRepWorkerLock);
3302 : }
3303 :
3304 : /*
3305 : * Return false if the leader apply worker has stopped retaining
3306 : * information for detecting conflicts. This implies that update_deleted
3307 : * can no longer be reliably detected.
3308 : */
3309 6 : if (!TransactionIdIsValid(oldestxmin))
3310 0 : return false;
3311 :
3312 8 : if (OidIsValid(localidxoid) &&
3313 2 : IsIndexUsableForFindingDeletedTuple(localidxoid, oldestxmin))
3314 2 : return RelationFindDeletedTupleInfoByIndex(localrel, localidxoid,
3315 : remoteslot, oldestxmin,
3316 : delete_xid, delete_origin,
3317 : delete_time);
3318 : else
3319 4 : return RelationFindDeletedTupleInfoSeq(localrel, remoteslot,
3320 : oldestxmin, delete_xid,
3321 : delete_origin, delete_time);
3322 : }
3323 :
3324 : /*
3325 : * This handles insert, update, delete on a partitioned table.
3326 : */
3327 : static void
3328 170 : apply_handle_tuple_routing(ApplyExecutionData *edata,
3329 : TupleTableSlot *remoteslot,
3330 : LogicalRepTupleData *newtup,
3331 : CmdType operation)
3332 : {
3333 170 : EState *estate = edata->estate;
3334 170 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
3335 170 : ResultRelInfo *relinfo = edata->targetRelInfo;
3336 170 : Relation parentrel = relinfo->ri_RelationDesc;
3337 : ModifyTableState *mtstate;
3338 : PartitionTupleRouting *proute;
3339 : ResultRelInfo *partrelinfo;
3340 : Relation partrel;
3341 : TupleTableSlot *remoteslot_part;
3342 : TupleConversionMap *map;
3343 : MemoryContext oldctx;
3344 170 : LogicalRepRelMapEntry *part_entry = NULL;
3345 170 : AttrMap *attrmap = NULL;
3346 :
3347 : /* ModifyTableState is needed for ExecFindPartition(). */
3348 170 : edata->mtstate = mtstate = makeNode(ModifyTableState);
3349 170 : mtstate->ps.plan = NULL;
3350 170 : mtstate->ps.state = estate;
3351 170 : mtstate->operation = operation;
3352 170 : mtstate->resultRelInfo = relinfo;
3353 :
3354 : /* ... as is PartitionTupleRouting. */
3355 170 : edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
3356 :
3357 : /*
3358 : * Find the partition to which the "search tuple" belongs.
3359 : */
3360 : Assert(remoteslot != NULL);
3361 170 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3362 170 : partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
3363 : remoteslot, estate);
3364 : Assert(partrelinfo != NULL);
3365 170 : partrel = partrelinfo->ri_RelationDesc;
3366 :
3367 : /*
3368 : * Check for supported relkind. We need this since partitions might be of
3369 : * unsupported relkinds; and the set of partitions can change, so checking
3370 : * at CREATE/ALTER SUBSCRIPTION would be insufficient.
3371 : */
3372 170 : CheckSubscriptionRelkind(partrel->rd_rel->relkind,
3373 170 : relmapentry->remoterel.relkind,
3374 170 : get_namespace_name(RelationGetNamespace(partrel)),
3375 170 : RelationGetRelationName(partrel));
3376 :
3377 : /*
3378 : * To perform any of the operations below, the tuple must match the
3379 : * partition's rowtype. Convert if needed or just copy, using a dedicated
3380 : * slot to store the tuple in any case.
3381 : */
3382 170 : remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
3383 170 : if (remoteslot_part == NULL)
3384 104 : remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
3385 170 : map = ExecGetRootToChildMap(partrelinfo, estate);
3386 170 : if (map != NULL)
3387 : {
3388 66 : attrmap = map->attrMap;
3389 66 : remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
3390 : remoteslot_part);
3391 : }
3392 : else
3393 : {
3394 104 : remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
3395 104 : slot_getallattrs(remoteslot_part);
3396 : }
3397 170 : MemoryContextSwitchTo(oldctx);
3398 :
3399 : /* Check if we can do the update or delete on the leaf partition. */
3400 170 : if (operation == CMD_UPDATE || operation == CMD_DELETE)
3401 : {
3402 60 : part_entry = logicalrep_partition_open(relmapentry, partrel,
3403 : attrmap);
3404 60 : check_relation_updatable(part_entry);
3405 : }
3406 :
3407 170 : switch (operation)
3408 : {
3409 110 : case CMD_INSERT:
3410 110 : apply_handle_insert_internal(edata, partrelinfo,
3411 : remoteslot_part);
3412 88 : break;
3413 :
3414 34 : case CMD_DELETE:
3415 34 : apply_handle_delete_internal(edata, partrelinfo,
3416 : remoteslot_part,
3417 : part_entry->localindexoid);
3418 34 : break;
3419 :
3420 26 : case CMD_UPDATE:
3421 :
3422 : /*
3423 : * For UPDATE, depending on whether or not the updated tuple
3424 : * satisfies the partition's constraint, perform a simple UPDATE
3425 : * of the partition or move the updated tuple into a different
3426 : * suitable partition.
3427 : */
3428 : {
3429 : TupleTableSlot *localslot;
3430 : ResultRelInfo *partrelinfo_new;
3431 : Relation partrel_new;
3432 : bool found;
3433 : EPQState epqstate;
3434 26 : ConflictTupleInfo conflicttuple = {0};
3435 :
3436 : /* Get the matching local tuple from the partition. */
3437 26 : found = FindReplTupleInLocalRel(edata, partrel,
3438 : &part_entry->remoterel,
3439 : part_entry->localindexoid,
3440 : remoteslot_part, &localslot);
3441 26 : if (!found)
3442 : {
3443 : ConflictType type;
3444 4 : TupleTableSlot *newslot = localslot;
3445 :
3446 : /*
3447 : * Detecting whether the tuple was recently deleted or
3448 : * never existed is crucial to avoid misleading the user
3449 : * during conflict handling.
3450 : */
3451 4 : if (FindDeletedTupleInLocalRel(partrel,
3452 : part_entry->localindexoid,
3453 : remoteslot_part,
3454 : &conflicttuple.xmin,
3455 : &conflicttuple.origin,
3456 0 : &conflicttuple.ts) &&
3457 0 : conflicttuple.origin != replorigin_session_origin)
3458 0 : type = CT_UPDATE_DELETED;
3459 : else
3460 4 : type = CT_UPDATE_MISSING;
3461 :
3462 : /* Store the new tuple for conflict reporting */
3463 4 : slot_store_data(newslot, part_entry, newtup);
3464 :
3465 : /*
3466 : * The tuple to be updated could not be found or was
3467 : * deleted. Do nothing except for emitting a log message.
3468 : */
3469 4 : ReportApplyConflict(estate, partrelinfo, LOG,
3470 : type, remoteslot_part, newslot,
3471 4 : list_make1(&conflicttuple));
3472 :
3473 4 : return;
3474 : }
3475 :
3476 : /*
3477 : * Report the conflict if the tuple was modified by a
3478 : * different origin.
3479 : */
3480 22 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
3481 : &conflicttuple.origin,
3482 2 : &conflicttuple.ts) &&
3483 2 : conflicttuple.origin != replorigin_session_origin)
3484 : {
3485 : TupleTableSlot *newslot;
3486 :
3487 : /* Store the new tuple for conflict reporting */
3488 2 : newslot = table_slot_create(partrel, &estate->es_tupleTable);
3489 2 : slot_store_data(newslot, part_entry, newtup);
3490 :
3491 2 : conflicttuple.slot = localslot;
3492 :
3493 2 : ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
3494 : remoteslot_part, newslot,
3495 2 : list_make1(&conflicttuple));
3496 : }
3497 :
3498 : /*
3499 : * Apply the update to the local tuple, putting the result in
3500 : * remoteslot_part.
3501 : */
3502 22 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3503 22 : slot_modify_data(remoteslot_part, localslot, part_entry,
3504 : newtup);
3505 22 : MemoryContextSwitchTo(oldctx);
3506 :
3507 22 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3508 :
3509 : /*
3510 : * Does the updated tuple still satisfy the current
3511 : * partition's constraint?
3512 : */
3513 44 : if (!partrel->rd_rel->relispartition ||
3514 22 : ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
3515 : false))
3516 : {
3517 : /*
3518 : * Yes, so simply UPDATE the partition. We don't call
3519 : * apply_handle_update_internal() here, which would
3520 : * normally do the following work, to avoid repeating some
3521 : * work already done above to find the local tuple in the
3522 : * partition.
3523 : */
3524 20 : InitConflictIndexes(partrelinfo);
3525 :
3526 20 : EvalPlanQualSetSlot(&epqstate, remoteslot_part);
3527 20 : TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
3528 : ACL_UPDATE);
3529 20 : ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3530 : localslot, remoteslot_part);
3531 : }
3532 : else
3533 : {
3534 : /* Move the tuple into the new partition. */
3535 :
3536 : /*
3537 : * New partition will be found using tuple routing, which
3538 : * can only occur via the parent table. We might need to
3539 : * convert the tuple to the parent's rowtype. Note that
3540 : * this is the tuple found in the partition, not the
3541 : * original search tuple received by this function.
3542 : */
3543 2 : if (map)
3544 : {
3545 : TupleConversionMap *PartitionToRootMap =
3546 2 : convert_tuples_by_name(RelationGetDescr(partrel),
3547 : RelationGetDescr(parentrel));
3548 :
3549 : remoteslot =
3550 2 : execute_attr_map_slot(PartitionToRootMap->attrMap,
3551 : remoteslot_part, remoteslot);
3552 : }
3553 : else
3554 : {
3555 0 : remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
3556 0 : slot_getallattrs(remoteslot);
3557 : }
3558 :
3559 : /* Find the new partition. */
3560 2 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3561 2 : partrelinfo_new = ExecFindPartition(mtstate, relinfo,
3562 : proute, remoteslot,
3563 : estate);
3564 2 : MemoryContextSwitchTo(oldctx);
3565 : Assert(partrelinfo_new != partrelinfo);
3566 2 : partrel_new = partrelinfo_new->ri_RelationDesc;
3567 :
3568 : /* Check that new partition also has supported relkind. */
3569 2 : CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3570 2 : relmapentry->remoterel.relkind,
3571 2 : get_namespace_name(RelationGetNamespace(partrel_new)),
3572 2 : RelationGetRelationName(partrel_new));
3573 :
3574 : /* DELETE old tuple found in the old partition. */
3575 2 : EvalPlanQualSetSlot(&epqstate, localslot);
3576 2 : TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, ACL_DELETE);
3577 2 : ExecSimpleRelationDelete(partrelinfo, estate, &epqstate, localslot);
3578 :
3579 : /* INSERT new tuple into the new partition. */
3580 :
3581 : /*
3582 : * Convert the replacement tuple to match the destination
3583 : * partition rowtype.
3584 : */
3585 2 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3586 2 : remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3587 2 : if (remoteslot_part == NULL)
3588 2 : remoteslot_part = table_slot_create(partrel_new,
3589 : &estate->es_tupleTable);
3590 2 : map = ExecGetRootToChildMap(partrelinfo_new, estate);
3591 2 : if (map != NULL)
3592 : {
3593 0 : remoteslot_part = execute_attr_map_slot(map->attrMap,
3594 : remoteslot,
3595 : remoteslot_part);
3596 : }
3597 : else
3598 : {
3599 2 : remoteslot_part = ExecCopySlot(remoteslot_part,
3600 : remoteslot);
3601 2 : slot_getallattrs(remoteslot);
3602 : }
3603 2 : MemoryContextSwitchTo(oldctx);
3604 2 : apply_handle_insert_internal(edata, partrelinfo_new,
3605 : remoteslot_part);
3606 : }
3607 :
3608 22 : EvalPlanQualEnd(&epqstate);
3609 : }
3610 22 : break;
3611 :
3612 0 : default:
3613 0 : elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3614 : break;
3615 : }
3616 : }
3617 :
3618 : /*
3619 : * Handle TRUNCATE message.
3620 : *
3621 : * TODO: FDW support
3622 : */
3623 : static void
3624 40 : apply_handle_truncate(StringInfo s)
3625 : {
3626 40 : bool cascade = false;
3627 40 : bool restart_seqs = false;
3628 40 : List *remote_relids = NIL;
3629 40 : List *remote_rels = NIL;
3630 40 : List *rels = NIL;
3631 40 : List *part_rels = NIL;
3632 40 : List *relids = NIL;
3633 40 : List *relids_logged = NIL;
3634 : ListCell *lc;
3635 40 : LOCKMODE lockmode = AccessExclusiveLock;
3636 :
3637 : /*
3638 : * Quick return if we are skipping data modification changes or handling
3639 : * streamed transactions.
3640 : */
3641 80 : if (is_skipping_changes() ||
3642 40 : handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
3643 0 : return;
3644 :
3645 40 : begin_replication_step();
3646 :
3647 40 : remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3648 :
3649 98 : foreach(lc, remote_relids)
3650 : {
3651 58 : LogicalRepRelId relid = lfirst_oid(lc);
3652 : LogicalRepRelMapEntry *rel;
3653 :
3654 58 : rel = logicalrep_rel_open(relid, lockmode);
3655 58 : if (!should_apply_changes_for_rel(rel))
3656 : {
3657 : /*
3658 : * The relation can't become interesting in the middle of the
3659 : * transaction so it's safe to unlock it.
3660 : */
3661 0 : logicalrep_rel_close(rel, lockmode);
3662 0 : continue;
3663 : }
3664 :
3665 58 : remote_rels = lappend(remote_rels, rel);
3666 58 : TargetPrivilegesCheck(rel->localrel, ACL_TRUNCATE);
3667 58 : rels = lappend(rels, rel->localrel);
3668 58 : relids = lappend_oid(relids, rel->localreloid);
3669 58 : if (RelationIsLogicallyLogged(rel->localrel))
3670 2 : relids_logged = lappend_oid(relids_logged, rel->localreloid);
3671 :
3672 : /*
3673 : * Truncate partitions if we got a message to truncate a partitioned
3674 : * table.
3675 : */
3676 58 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3677 : {
3678 : ListCell *child;
3679 8 : List *children = find_all_inheritors(rel->localreloid,
3680 : lockmode,
3681 : NULL);
3682 :
3683 30 : foreach(child, children)
3684 : {
3685 22 : Oid childrelid = lfirst_oid(child);
3686 : Relation childrel;
3687 :
3688 22 : if (list_member_oid(relids, childrelid))
3689 8 : continue;
3690 :
3691 : /* find_all_inheritors already got lock */
3692 14 : childrel = table_open(childrelid, NoLock);
3693 :
3694 : /*
3695 : * Ignore temp tables of other backends. See similar code in
3696 : * ExecuteTruncate().
3697 : */
3698 14 : if (RELATION_IS_OTHER_TEMP(childrel))
3699 : {
3700 0 : table_close(childrel, lockmode);
3701 0 : continue;
3702 : }
3703 :
3704 14 : TargetPrivilegesCheck(childrel, ACL_TRUNCATE);
3705 14 : rels = lappend(rels, childrel);
3706 14 : part_rels = lappend(part_rels, childrel);
3707 14 : relids = lappend_oid(relids, childrelid);
3708 : /* Log this relation only if needed for logical decoding */
3709 14 : if (RelationIsLogicallyLogged(childrel))
3710 0 : relids_logged = lappend_oid(relids_logged, childrelid);
3711 : }
3712 : }
3713 : }
3714 :
3715 : /*
3716 : * Even if we used CASCADE on the upstream primary we explicitly default
3717 : * to replaying changes without further cascading. This might be later
3718 : * changeable with a user specified option.
3719 : *
3720 : * MySubscription->runasowner tells us whether we want to execute
3721 : * replication actions as the subscription owner; the last argument to
3722 : * TruncateGuts tells it whether we want to switch to the table owner.
3723 : * Those are exactly opposite conditions.
3724 : */
3725 40 : ExecuteTruncateGuts(rels,
3726 : relids,
3727 : relids_logged,
3728 : DROP_RESTRICT,
3729 : restart_seqs,
3730 40 : !MySubscription->runasowner);
3731 98 : foreach(lc, remote_rels)
3732 : {
3733 58 : LogicalRepRelMapEntry *rel = lfirst(lc);
3734 :
3735 58 : logicalrep_rel_close(rel, NoLock);
3736 : }
3737 54 : foreach(lc, part_rels)
3738 : {
3739 14 : Relation rel = lfirst(lc);
3740 :
3741 14 : table_close(rel, NoLock);
3742 : }
3743 :
3744 40 : end_replication_step();
3745 : }
3746 :
3747 :
3748 : /*
3749 : * Logical replication protocol message dispatcher.
3750 : */
3751 : void
3752 664976 : apply_dispatch(StringInfo s)
3753 : {
3754 664976 : LogicalRepMsgType action = pq_getmsgbyte(s);
3755 : LogicalRepMsgType saved_command;
3756 :
3757 : /*
3758 : * Set the current command being applied. Since this function can be
3759 : * called recursively when applying spooled changes, save the current
3760 : * command.
3761 : */
3762 664976 : saved_command = apply_error_callback_arg.command;
3763 664976 : apply_error_callback_arg.command = action;
3764 :
3765 664976 : switch (action)
3766 : {
3767 958 : case LOGICAL_REP_MSG_BEGIN:
3768 958 : apply_handle_begin(s);
3769 958 : break;
3770 :
3771 876 : case LOGICAL_REP_MSG_COMMIT:
3772 876 : apply_handle_commit(s);
3773 874 : break;
3774 :
3775 362276 : case LOGICAL_REP_MSG_INSERT:
3776 362276 : apply_handle_insert(s);
3777 362206 : break;
3778 :
3779 132328 : case LOGICAL_REP_MSG_UPDATE:
3780 132328 : apply_handle_update(s);
3781 132312 : break;
3782 :
3783 163872 : case LOGICAL_REP_MSG_DELETE:
3784 163872 : apply_handle_delete(s);
3785 163872 : break;
3786 :
3787 40 : case LOGICAL_REP_MSG_TRUNCATE:
3788 40 : apply_handle_truncate(s);
3789 40 : break;
3790 :
3791 918 : case LOGICAL_REP_MSG_RELATION:
3792 918 : apply_handle_relation(s);
3793 918 : break;
3794 :
3795 36 : case LOGICAL_REP_MSG_TYPE:
3796 36 : apply_handle_type(s);
3797 36 : break;
3798 :
3799 14 : case LOGICAL_REP_MSG_ORIGIN:
3800 14 : apply_handle_origin(s);
3801 14 : break;
3802 :
3803 0 : case LOGICAL_REP_MSG_MESSAGE:
3804 :
3805 : /*
3806 : * Logical replication does not use generic logical messages yet.
3807 : * Although, it could be used by other applications that use this
3808 : * output plugin.
3809 : */
3810 0 : break;
3811 :
3812 1666 : case LOGICAL_REP_MSG_STREAM_START:
3813 1666 : apply_handle_stream_start(s);
3814 1666 : break;
3815 :
3816 1664 : case LOGICAL_REP_MSG_STREAM_STOP:
3817 1664 : apply_handle_stream_stop(s);
3818 1660 : break;
3819 :
3820 76 : case LOGICAL_REP_MSG_STREAM_ABORT:
3821 76 : apply_handle_stream_abort(s);
3822 76 : break;
3823 :
3824 118 : case LOGICAL_REP_MSG_STREAM_COMMIT:
3825 118 : apply_handle_stream_commit(s);
3826 114 : break;
3827 :
3828 32 : case LOGICAL_REP_MSG_BEGIN_PREPARE:
3829 32 : apply_handle_begin_prepare(s);
3830 32 : break;
3831 :
3832 30 : case LOGICAL_REP_MSG_PREPARE:
3833 30 : apply_handle_prepare(s);
3834 28 : break;
3835 :
3836 40 : case LOGICAL_REP_MSG_COMMIT_PREPARED:
3837 40 : apply_handle_commit_prepared(s);
3838 40 : break;
3839 :
3840 10 : case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
3841 10 : apply_handle_rollback_prepared(s);
3842 10 : break;
3843 :
3844 22 : case LOGICAL_REP_MSG_STREAM_PREPARE:
3845 22 : apply_handle_stream_prepare(s);
3846 22 : break;
3847 :
3848 0 : default:
3849 0 : ereport(ERROR,
3850 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
3851 : errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3852 : }
3853 :
3854 : /* Reset the current command */
3855 664878 : apply_error_callback_arg.command = saved_command;
3856 664878 : }
3857 :
3858 : /*
3859 : * Figure out which write/flush positions to report to the walsender process.
3860 : *
3861 : * We can't simply report back the last LSN the walsender sent us because the
3862 : * local transaction might not yet be flushed to disk locally. Instead we
3863 : * build a list that associates local with remote LSNs for every commit. When
3864 : * reporting back the flush position to the sender we iterate that list and
3865 : * check which entries on it are already locally flushed. Those we can report
3866 : * as having been flushed.
3867 : *
3868 : * The have_pending_txes is true if there are outstanding transactions that
3869 : * need to be flushed.
3870 : */
3871 : static void
3872 135770 : get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
3873 : bool *have_pending_txes)
3874 : {
3875 : dlist_mutable_iter iter;
3876 135770 : XLogRecPtr local_flush = GetFlushRecPtr(NULL);
3877 :
3878 135770 : *write = InvalidXLogRecPtr;
3879 135770 : *flush = InvalidXLogRecPtr;
3880 :
3881 136746 : dlist_foreach_modify(iter, &lsn_mapping)
3882 : {
3883 19384 : FlushPosition *pos =
3884 19384 : dlist_container(FlushPosition, node, iter.cur);
3885 :
3886 19384 : *write = pos->remote_end;
3887 :
3888 19384 : if (pos->local_end <= local_flush)
3889 : {
3890 976 : *flush = pos->remote_end;
3891 976 : dlist_delete(iter.cur);
3892 976 : pfree(pos);
3893 : }
3894 : else
3895 : {
3896 : /*
3897 : * Don't want to uselessly iterate over the rest of the list which
3898 : * could potentially be long. Instead get the last element and
3899 : * grab the write position from there.
3900 : */
3901 18408 : pos = dlist_tail_element(FlushPosition, node,
3902 : &lsn_mapping);
3903 18408 : *write = pos->remote_end;
3904 18408 : *have_pending_txes = true;
3905 18408 : return;
3906 : }
3907 : }
3908 :
3909 117362 : *have_pending_txes = !dlist_is_empty(&lsn_mapping);
3910 : }
3911 :
3912 : /*
3913 : * Store current remote/local lsn pair in the tracking list.
3914 : */
3915 : void
3916 1084 : store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
3917 : {
3918 : FlushPosition *flushpos;
3919 :
3920 : /*
3921 : * Skip for parallel apply workers, because the lsn_mapping is maintained
3922 : * by the leader apply worker.
3923 : */
3924 1084 : if (am_parallel_apply_worker())
3925 36 : return;
3926 :
3927 : /* Need to do this in permanent context */
3928 1048 : MemoryContextSwitchTo(ApplyContext);
3929 :
3930 : /* Track commit lsn */
3931 1048 : flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
3932 1048 : flushpos->local_end = local_lsn;
3933 1048 : flushpos->remote_end = remote_lsn;
3934 :
3935 1048 : dlist_push_tail(&lsn_mapping, &flushpos->node);
3936 1048 : MemoryContextSwitchTo(ApplyMessageContext);
3937 : }
3938 :
3939 :
3940 : /* Update statistics of the worker. */
3941 : static void
3942 376246 : UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
3943 : {
3944 376246 : MyLogicalRepWorker->last_lsn = last_lsn;
3945 376246 : MyLogicalRepWorker->last_send_time = send_time;
3946 376246 : MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
3947 376246 : if (reply)
3948 : {
3949 3274 : MyLogicalRepWorker->reply_lsn = last_lsn;
3950 3274 : MyLogicalRepWorker->reply_time = send_time;
3951 : }
3952 376246 : }
3953 :
3954 : /*
3955 : * Apply main loop.
3956 : */
3957 : static void
3958 798 : LogicalRepApplyLoop(XLogRecPtr last_received)
3959 : {
3960 798 : TimestampTz last_recv_timestamp = GetCurrentTimestamp();
3961 798 : bool ping_sent = false;
3962 : TimeLineID tli;
3963 : ErrorContextCallback errcallback;
3964 798 : RetainDeadTuplesData rdt_data = {0};
3965 :
3966 : /*
3967 : * Init the ApplyMessageContext which we clean up after each replication
3968 : * protocol message.
3969 : */
3970 798 : ApplyMessageContext = AllocSetContextCreate(ApplyContext,
3971 : "ApplyMessageContext",
3972 : ALLOCSET_DEFAULT_SIZES);
3973 :
3974 : /*
3975 : * This memory context is used for per-stream data when the streaming mode
3976 : * is enabled. This context is reset on each stream stop.
3977 : */
3978 798 : LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
3979 : "LogicalStreamingContext",
3980 : ALLOCSET_DEFAULT_SIZES);
3981 :
3982 : /* mark as idle, before starting to loop */
3983 798 : pgstat_report_activity(STATE_IDLE, NULL);
3984 :
3985 : /*
3986 : * Push apply error context callback. Fields will be filled while applying
3987 : * a change.
3988 : */
3989 798 : errcallback.callback = apply_error_callback;
3990 798 : errcallback.previous = error_context_stack;
3991 798 : error_context_stack = &errcallback;
3992 798 : apply_error_context_stack = error_context_stack;
3993 :
3994 : /* This outer loop iterates once per wait. */
3995 : for (;;)
3996 131438 : {
3997 132236 : pgsocket fd = PGINVALID_SOCKET;
3998 : int rc;
3999 : int len;
4000 132236 : char *buf = NULL;
4001 132236 : bool endofstream = false;
4002 : long wait_time;
4003 :
4004 132236 : CHECK_FOR_INTERRUPTS();
4005 :
4006 132236 : MemoryContextSwitchTo(ApplyMessageContext);
4007 :
4008 132236 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
4009 :
4010 132200 : if (len != 0)
4011 : {
4012 : /* Loop to process all available data (without blocking). */
4013 : for (;;)
4014 : {
4015 506318 : CHECK_FOR_INTERRUPTS();
4016 :
4017 506318 : if (len == 0)
4018 : {
4019 130052 : break;
4020 : }
4021 376266 : else if (len < 0)
4022 : {
4023 20 : ereport(LOG,
4024 : (errmsg("data stream from publisher has ended")));
4025 20 : endofstream = true;
4026 20 : break;
4027 : }
4028 : else
4029 : {
4030 : int c;
4031 : StringInfoData s;
4032 :
4033 376246 : if (ConfigReloadPending)
4034 : {
4035 0 : ConfigReloadPending = false;
4036 0 : ProcessConfigFile(PGC_SIGHUP);
4037 : }
4038 :
4039 : /* Reset timeout. */
4040 376246 : last_recv_timestamp = GetCurrentTimestamp();
4041 376246 : ping_sent = false;
4042 :
4043 376246 : rdt_data.last_recv_time = last_recv_timestamp;
4044 :
4045 : /* Ensure we are reading the data into our memory context. */
4046 376246 : MemoryContextSwitchTo(ApplyMessageContext);
4047 :
4048 376246 : initReadOnlyStringInfo(&s, buf, len);
4049 :
4050 376246 : c = pq_getmsgbyte(&s);
4051 :
4052 376246 : if (c == PqReplMsg_WALData)
4053 : {
4054 : XLogRecPtr start_lsn;
4055 : XLogRecPtr end_lsn;
4056 : TimestampTz send_time;
4057 :
4058 369720 : start_lsn = pq_getmsgint64(&s);
4059 369720 : end_lsn = pq_getmsgint64(&s);
4060 369720 : send_time = pq_getmsgint64(&s);
4061 :
4062 369720 : if (last_received < start_lsn)
4063 297186 : last_received = start_lsn;
4064 :
4065 369720 : if (last_received < end_lsn)
4066 0 : last_received = end_lsn;
4067 :
4068 369720 : UpdateWorkerStats(last_received, send_time, false);
4069 :
4070 369720 : apply_dispatch(&s);
4071 :
4072 369628 : maybe_advance_nonremovable_xid(&rdt_data, false);
4073 : }
4074 6526 : else if (c == PqReplMsg_Keepalive)
4075 : {
4076 : XLogRecPtr end_lsn;
4077 : TimestampTz timestamp;
4078 : bool reply_requested;
4079 :
4080 3274 : end_lsn = pq_getmsgint64(&s);
4081 3274 : timestamp = pq_getmsgint64(&s);
4082 3274 : reply_requested = pq_getmsgbyte(&s);
4083 :
4084 3274 : if (last_received < end_lsn)
4085 1888 : last_received = end_lsn;
4086 :
4087 3274 : send_feedback(last_received, reply_requested, false);
4088 :
4089 3274 : maybe_advance_nonremovable_xid(&rdt_data, false);
4090 :
4091 3274 : UpdateWorkerStats(last_received, timestamp, true);
4092 : }
4093 3252 : else if (c == PqReplMsg_PrimaryStatusUpdate)
4094 : {
4095 3252 : rdt_data.remote_lsn = pq_getmsgint64(&s);
4096 3252 : rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
4097 3252 : rdt_data.remote_nextxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
4098 3252 : rdt_data.reply_time = pq_getmsgint64(&s);
4099 :
4100 : /*
4101 : * This should never happen, see
4102 : * ProcessStandbyPSRequestMessage. But if it happens
4103 : * due to a bug, we don't want to proceed as it can
4104 : * incorrectly advance oldest_nonremovable_xid.
4105 : */
4106 3252 : if (XLogRecPtrIsInvalid(rdt_data.remote_lsn))
4107 0 : elog(ERROR, "cannot get the latest WAL position from the publisher");
4108 :
4109 3252 : maybe_advance_nonremovable_xid(&rdt_data, true);
4110 :
4111 3252 : UpdateWorkerStats(last_received, rdt_data.reply_time, false);
4112 : }
4113 : /* other message types are purposefully ignored */
4114 :
4115 376154 : MemoryContextReset(ApplyMessageContext);
4116 : }
4117 :
4118 376154 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
4119 : }
4120 : }
4121 :
4122 : /* confirm all writes so far */
4123 132108 : send_feedback(last_received, false, false);
4124 :
4125 : /* Reset the timestamp if no message was received */
4126 132108 : rdt_data.last_recv_time = 0;
4127 :
4128 132108 : maybe_advance_nonremovable_xid(&rdt_data, false);
4129 :
4130 132106 : if (!in_remote_transaction && !in_streamed_transaction)
4131 : {
4132 : /*
4133 : * If we didn't get any transactions for a while there might be
4134 : * unconsumed invalidation messages in the queue, consume them
4135 : * now.
4136 : */
4137 8496 : AcceptInvalidationMessages();
4138 8496 : maybe_reread_subscription();
4139 :
4140 : /* Process any table synchronization changes. */
4141 8414 : ProcessSyncingRelations(last_received);
4142 : }
4143 :
4144 : /* Cleanup the memory. */
4145 131646 : MemoryContextReset(ApplyMessageContext);
4146 131646 : MemoryContextSwitchTo(TopMemoryContext);
4147 :
4148 : /* Check if we need to exit the streaming loop. */
4149 131646 : if (endofstream)
4150 20 : break;
4151 :
4152 : /*
4153 : * Wait for more data or latch. If we have unflushed transactions,
4154 : * wake up after WalWriterDelay to see if they've been flushed yet (in
4155 : * which case we should send a feedback message). Otherwise, there's
4156 : * no particular urgency about waking up unless we get data or a
4157 : * signal.
4158 : */
4159 131626 : if (!dlist_is_empty(&lsn_mapping))
4160 17418 : wait_time = WalWriterDelay;
4161 : else
4162 114208 : wait_time = NAPTIME_PER_CYCLE;
4163 :
4164 : /*
4165 : * Ensure to wake up when it's possible to advance the non-removable
4166 : * transaction ID, or when the retention duration may have exceeded
4167 : * max_retention_duration.
4168 : */
4169 131626 : if (MySubscription->retentionactive)
4170 : {
4171 3428 : if (rdt_data.phase == RDT_GET_CANDIDATE_XID &&
4172 126 : rdt_data.xid_advance_interval)
4173 0 : wait_time = Min(wait_time, rdt_data.xid_advance_interval);
4174 3428 : else if (MySubscription->maxretention > 0)
4175 4 : wait_time = Min(wait_time, MySubscription->maxretention);
4176 : }
4177 :
4178 131626 : rc = WaitLatchOrSocket(MyLatch,
4179 : WL_SOCKET_READABLE | WL_LATCH_SET |
4180 : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
4181 : fd, wait_time,
4182 : WAIT_EVENT_LOGICAL_APPLY_MAIN);
4183 :
4184 131626 : if (rc & WL_LATCH_SET)
4185 : {
4186 1358 : ResetLatch(MyLatch);
4187 1358 : CHECK_FOR_INTERRUPTS();
4188 : }
4189 :
4190 131438 : if (ConfigReloadPending)
4191 : {
4192 20 : ConfigReloadPending = false;
4193 20 : ProcessConfigFile(PGC_SIGHUP);
4194 : }
4195 :
4196 131438 : if (rc & WL_TIMEOUT)
4197 : {
4198 : /*
4199 : * We didn't receive anything new. If we haven't heard anything
4200 : * from the server for more than wal_receiver_timeout / 2, ping
4201 : * the server. Also, if it's been longer than
4202 : * wal_receiver_status_interval since the last update we sent,
4203 : * send a status update to the primary anyway, to report any
4204 : * progress in applying WAL.
4205 : */
4206 362 : bool requestReply = false;
4207 :
4208 : /*
4209 : * Check if time since last receive from primary has reached the
4210 : * configured limit.
4211 : */
4212 362 : if (wal_receiver_timeout > 0)
4213 : {
4214 362 : TimestampTz now = GetCurrentTimestamp();
4215 : TimestampTz timeout;
4216 :
4217 362 : timeout =
4218 362 : TimestampTzPlusMilliseconds(last_recv_timestamp,
4219 : wal_receiver_timeout);
4220 :
4221 362 : if (now >= timeout)
4222 0 : ereport(ERROR,
4223 : (errcode(ERRCODE_CONNECTION_FAILURE),
4224 : errmsg("terminating logical replication worker due to timeout")));
4225 :
4226 : /* Check to see if it's time for a ping. */
4227 362 : if (!ping_sent)
4228 : {
4229 362 : timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
4230 : (wal_receiver_timeout / 2));
4231 362 : if (now >= timeout)
4232 : {
4233 0 : requestReply = true;
4234 0 : ping_sent = true;
4235 : }
4236 : }
4237 : }
4238 :
4239 362 : send_feedback(last_received, requestReply, requestReply);
4240 :
4241 362 : maybe_advance_nonremovable_xid(&rdt_data, false);
4242 :
4243 : /*
4244 : * Force reporting to ensure long idle periods don't lead to
4245 : * arbitrarily delayed stats. Stats can only be reported outside
4246 : * of (implicit or explicit) transactions. That shouldn't lead to
4247 : * stats being delayed for long, because transactions are either
4248 : * sent as a whole on commit or streamed. Streamed transactions
4249 : * are spilled to disk and applied on commit.
4250 : */
4251 362 : if (!IsTransactionState())
4252 362 : pgstat_report_stat(true);
4253 : }
4254 : }
4255 :
4256 : /* Pop the error context stack */
4257 20 : error_context_stack = errcallback.previous;
4258 20 : apply_error_context_stack = error_context_stack;
4259 :
4260 : /* All done */
4261 20 : walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
4262 0 : }
4263 :
4264 : /*
4265 : * Send a Standby Status Update message to server.
4266 : *
4267 : * 'recvpos' is the latest LSN we've received data to, force is set if we need
4268 : * to send a response to avoid timeouts.
4269 : */
4270 : static void
4271 135744 : send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
4272 : {
4273 : static StringInfo reply_message = NULL;
4274 : static TimestampTz send_time = 0;
4275 :
4276 : static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
4277 : static XLogRecPtr last_writepos = InvalidXLogRecPtr;
4278 :
4279 : XLogRecPtr writepos;
4280 : XLogRecPtr flushpos;
4281 : TimestampTz now;
4282 : bool have_pending_txes;
4283 :
4284 : /*
4285 : * If the user doesn't want status to be reported to the publisher, be
4286 : * sure to exit before doing anything at all.
4287 : */
4288 135744 : if (!force && wal_receiver_status_interval <= 0)
4289 38556 : return;
4290 :
4291 : /* It's legal to not pass a recvpos */
4292 135744 : if (recvpos < last_recvpos)
4293 0 : recvpos = last_recvpos;
4294 :
4295 135744 : get_flush_position(&writepos, &flushpos, &have_pending_txes);
4296 :
4297 : /*
4298 : * No outstanding transactions to flush, we can report the latest received
4299 : * position. This is important for synchronous replication.
4300 : */
4301 135744 : if (!have_pending_txes)
4302 117348 : flushpos = writepos = recvpos;
4303 :
4304 135744 : if (writepos < last_writepos)
4305 0 : writepos = last_writepos;
4306 :
4307 135744 : if (flushpos < last_flushpos)
4308 18312 : flushpos = last_flushpos;
4309 :
4310 135744 : now = GetCurrentTimestamp();
4311 :
4312 : /* if we've already reported everything we're good */
4313 135744 : if (!force &&
4314 135736 : writepos == last_writepos &&
4315 39088 : flushpos == last_flushpos &&
4316 38808 : !TimestampDifferenceExceeds(send_time, now,
4317 : wal_receiver_status_interval * 1000))
4318 38556 : return;
4319 97188 : send_time = now;
4320 :
4321 97188 : if (!reply_message)
4322 : {
4323 798 : MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
4324 :
4325 798 : reply_message = makeStringInfo();
4326 798 : MemoryContextSwitchTo(oldctx);
4327 : }
4328 : else
4329 96390 : resetStringInfo(reply_message);
4330 :
4331 97188 : pq_sendbyte(reply_message, PqReplMsg_StandbyStatusUpdate);
4332 97188 : pq_sendint64(reply_message, recvpos); /* write */
4333 97188 : pq_sendint64(reply_message, flushpos); /* flush */
4334 97188 : pq_sendint64(reply_message, writepos); /* apply */
4335 97188 : pq_sendint64(reply_message, now); /* sendTime */
4336 97188 : pq_sendbyte(reply_message, requestReply); /* replyRequested */
4337 :
4338 97188 : elog(DEBUG2, "sending feedback (force %d) to recv %X/%08X, write %X/%08X, flush %X/%08X",
4339 : force,
4340 : LSN_FORMAT_ARGS(recvpos),
4341 : LSN_FORMAT_ARGS(writepos),
4342 : LSN_FORMAT_ARGS(flushpos));
4343 :
4344 97188 : walrcv_send(LogRepWorkerWalRcvConn,
4345 : reply_message->data, reply_message->len);
4346 :
4347 97188 : if (recvpos > last_recvpos)
4348 96652 : last_recvpos = recvpos;
4349 97188 : if (writepos > last_writepos)
4350 96654 : last_writepos = writepos;
4351 97188 : if (flushpos > last_flushpos)
4352 96162 : last_flushpos = flushpos;
4353 : }
4354 :
4355 : /*
4356 : * Attempt to advance the non-removable transaction ID.
4357 : *
4358 : * See comments atop worker.c for details.
4359 : */
4360 : static void
4361 508624 : maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
4362 : bool status_received)
4363 : {
4364 508624 : if (!can_advance_nonremovable_xid(rdt_data))
4365 501586 : return;
4366 :
4367 7038 : process_rdt_phase_transition(rdt_data, status_received);
4368 : }
4369 :
4370 : /*
4371 : * Preliminary check to determine if advancing the non-removable transaction ID
4372 : * is allowed.
4373 : */
4374 : static bool
4375 508624 : can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
4376 : {
4377 : /*
4378 : * It is sufficient to manage non-removable transaction ID for a
4379 : * subscription by the main apply worker to detect update_deleted reliably
4380 : * even for table sync or parallel apply workers.
4381 : */
4382 508624 : if (!am_leader_apply_worker())
4383 572 : return false;
4384 :
4385 : /* No need to advance if retaining dead tuples is not required */
4386 508052 : if (!MySubscription->retaindeadtuples)
4387 501014 : return false;
4388 :
4389 7038 : return true;
4390 : }
4391 :
4392 : /*
4393 : * Process phase transitions during the non-removable transaction ID
4394 : * advancement. See comments atop worker.c for details of the transition.
4395 : */
4396 : static void
4397 10444 : process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
4398 : bool status_received)
4399 : {
4400 10444 : switch (rdt_data->phase)
4401 : {
4402 340 : case RDT_GET_CANDIDATE_XID:
4403 340 : get_candidate_xid(rdt_data);
4404 340 : break;
4405 3262 : case RDT_REQUEST_PUBLISHER_STATUS:
4406 3262 : request_publisher_status(rdt_data);
4407 3262 : break;
4408 6602 : case RDT_WAIT_FOR_PUBLISHER_STATUS:
4409 6602 : wait_for_publisher_status(rdt_data, status_received);
4410 6602 : break;
4411 236 : case RDT_WAIT_FOR_LOCAL_FLUSH:
4412 236 : wait_for_local_flush(rdt_data);
4413 236 : break;
4414 2 : case RDT_STOP_CONFLICT_INFO_RETENTION:
4415 2 : stop_conflict_info_retention(rdt_data);
4416 2 : break;
4417 2 : case RDT_RESUME_CONFLICT_INFO_RETENTION:
4418 2 : resume_conflict_info_retention(rdt_data);
4419 0 : break;
4420 : }
4421 10442 : }
4422 :
4423 : /*
4424 : * Workhorse for the RDT_GET_CANDIDATE_XID phase.
4425 : */
4426 : static void
4427 340 : get_candidate_xid(RetainDeadTuplesData *rdt_data)
4428 : {
4429 : TransactionId oldest_running_xid;
4430 : TimestampTz now;
4431 :
4432 : /*
4433 : * Use last_recv_time when applying changes in the loop to avoid
4434 : * unnecessary system time retrieval. If last_recv_time is not available,
4435 : * obtain the current timestamp.
4436 : */
4437 340 : now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4438 :
4439 : /*
4440 : * Compute the candidate_xid and request the publisher status at most once
4441 : * per xid_advance_interval. Refer to adjust_xid_advance_interval() for
4442 : * details on how this value is dynamically adjusted. This is to avoid
4443 : * using CPU and network resources without making much progress.
4444 : */
4445 340 : if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
4446 : rdt_data->xid_advance_interval))
4447 0 : return;
4448 :
4449 : /*
4450 : * Immediately update the timer, even if the function returns later
4451 : * without setting candidate_xid due to inactivity on the subscriber. This
4452 : * avoids frequent calls to GetOldestActiveTransactionId.
4453 : */
4454 340 : rdt_data->candidate_xid_time = now;
4455 :
4456 : /*
4457 : * Consider transactions in the current database, as only dead tuples from
4458 : * this database are required for conflict detection.
4459 : */
4460 340 : oldest_running_xid = GetOldestActiveTransactionId(false, false);
4461 :
4462 : /*
4463 : * Oldest active transaction ID (oldest_running_xid) can't be behind any
4464 : * of its previously computed value.
4465 : */
4466 : Assert(TransactionIdPrecedesOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
4467 : oldest_running_xid));
4468 :
4469 : /* Return if the oldest_nonremovable_xid cannot be advanced */
4470 340 : if (TransactionIdEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
4471 : oldest_running_xid))
4472 : {
4473 256 : adjust_xid_advance_interval(rdt_data, false);
4474 256 : return;
4475 : }
4476 :
4477 84 : adjust_xid_advance_interval(rdt_data, true);
4478 :
4479 84 : rdt_data->candidate_xid = oldest_running_xid;
4480 84 : rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
4481 :
4482 : /* process the next phase */
4483 84 : process_rdt_phase_transition(rdt_data, false);
4484 : }
4485 :
4486 : /*
4487 : * Workhorse for the RDT_REQUEST_PUBLISHER_STATUS phase.
4488 : */
4489 : static void
4490 3262 : request_publisher_status(RetainDeadTuplesData *rdt_data)
4491 : {
4492 : static StringInfo request_message = NULL;
4493 :
4494 3262 : if (!request_message)
4495 : {
4496 22 : MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
4497 :
4498 22 : request_message = makeStringInfo();
4499 22 : MemoryContextSwitchTo(oldctx);
4500 : }
4501 : else
4502 3240 : resetStringInfo(request_message);
4503 :
4504 : /*
4505 : * Send the current time to update the remote walsender's latest reply
4506 : * message received time.
4507 : */
4508 3262 : pq_sendbyte(request_message, PqReplMsg_PrimaryStatusRequest);
4509 3262 : pq_sendint64(request_message, GetCurrentTimestamp());
4510 :
4511 3262 : elog(DEBUG2, "sending publisher status request message");
4512 :
4513 : /* Send a request for the publisher status */
4514 3262 : walrcv_send(LogRepWorkerWalRcvConn,
4515 : request_message->data, request_message->len);
4516 :
4517 3262 : rdt_data->phase = RDT_WAIT_FOR_PUBLISHER_STATUS;
4518 :
4519 : /*
4520 : * Skip calling maybe_advance_nonremovable_xid() since further transition
4521 : * is possible only once we receive the publisher status message.
4522 : */
4523 3262 : }
4524 :
4525 : /*
4526 : * Workhorse for the RDT_WAIT_FOR_PUBLISHER_STATUS phase.
4527 : */
4528 : static void
4529 6602 : wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
4530 : bool status_received)
4531 : {
4532 : /*
4533 : * Return if we have requested but not yet received the publisher status.
4534 : */
4535 6602 : if (!status_received)
4536 3350 : return;
4537 :
4538 : /*
4539 : * We don't need to maintain oldest_nonremovable_xid if we decide to stop
4540 : * retaining conflict information for this worker.
4541 : */
4542 3252 : if (should_stop_conflict_info_retention(rdt_data))
4543 : {
4544 0 : rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
4545 0 : return;
4546 : }
4547 :
4548 3252 : if (!FullTransactionIdIsValid(rdt_data->remote_wait_for))
4549 74 : rdt_data->remote_wait_for = rdt_data->remote_nextxid;
4550 :
4551 : /*
4552 : * Check if all remote concurrent transactions that were active at the
4553 : * first status request have now completed. If completed, proceed to the
4554 : * next phase; otherwise, continue checking the publisher status until
4555 : * these transactions finish.
4556 : *
4557 : * It's possible that transactions in the commit phase during the last
4558 : * cycle have now finished committing, but remote_oldestxid remains older
4559 : * than remote_wait_for. This can happen if some old transaction came in
4560 : * the commit phase when we requested status in this cycle. We do not
4561 : * handle this case explicitly as it's rare and the benefit doesn't
4562 : * justify the required complexity. Tracking would require either caching
4563 : * all xids at the publisher or sending them to subscribers. The condition
4564 : * will resolve naturally once the remaining transactions are finished.
4565 : *
4566 : * Directly advancing the non-removable transaction ID is possible if
4567 : * there are no activities on the publisher since the last advancement
4568 : * cycle. However, it requires maintaining two fields, last_remote_nextxid
4569 : * and last_remote_lsn, within the structure for comparison with the
4570 : * current cycle's values. Considering the minimal cost of continuing in
4571 : * RDT_WAIT_FOR_LOCAL_FLUSH without awaiting changes, we opted not to
4572 : * advance the transaction ID here.
4573 : */
4574 3252 : if (FullTransactionIdPrecedesOrEquals(rdt_data->remote_wait_for,
4575 : rdt_data->remote_oldestxid))
4576 74 : rdt_data->phase = RDT_WAIT_FOR_LOCAL_FLUSH;
4577 : else
4578 3178 : rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
4579 :
4580 : /* process the next phase */
4581 3252 : process_rdt_phase_transition(rdt_data, false);
4582 : }
4583 :
4584 : /*
4585 : * Workhorse for the RDT_WAIT_FOR_LOCAL_FLUSH phase.
4586 : */
4587 : static void
4588 236 : wait_for_local_flush(RetainDeadTuplesData *rdt_data)
4589 : {
4590 : Assert(!XLogRecPtrIsInvalid(rdt_data->remote_lsn) &&
4591 : TransactionIdIsValid(rdt_data->candidate_xid));
4592 :
4593 : /*
4594 : * We expect the publisher and subscriber clocks to be in sync using time
4595 : * sync service like NTP. Otherwise, we will advance this worker's
4596 : * oldest_nonremovable_xid prematurely, leading to the removal of rows
4597 : * required to detect update_deleted reliably. This check primarily
4598 : * addresses scenarios where the publisher's clock falls behind; if the
4599 : * publisher's clock is ahead, subsequent transactions will naturally bear
4600 : * later commit timestamps, conforming to the design outlined atop
4601 : * worker.c.
4602 : *
4603 : * XXX Consider waiting for the publisher's clock to catch up with the
4604 : * subscriber's before proceeding to the next phase.
4605 : */
4606 236 : if (TimestampDifferenceExceeds(rdt_data->reply_time,
4607 : rdt_data->candidate_xid_time, 0))
4608 0 : ereport(ERROR,
4609 : errmsg_internal("oldest_nonremovable_xid transaction ID could be advanced prematurely"),
4610 : errdetail_internal("The clock on the publisher is behind that of the subscriber."));
4611 :
4612 : /*
4613 : * Do not attempt to advance the non-removable transaction ID when table
4614 : * sync is in progress. During this time, changes from a single
4615 : * transaction may be applied by multiple table sync workers corresponding
4616 : * to the target tables. So, it's necessary for all table sync workers to
4617 : * apply and flush the corresponding changes before advancing the
4618 : * transaction ID, otherwise, dead tuples that are still needed for
4619 : * conflict detection in table sync workers could be removed prematurely.
4620 : * However, confirming the apply and flush progress across all table sync
4621 : * workers is complex and not worth the effort, so we simply return if not
4622 : * all tables are in the READY state.
4623 : *
4624 : * Advancing the transaction ID is necessary even when no tables are
4625 : * currently subscribed, to avoid retaining dead tuples unnecessarily.
4626 : * While it might seem safe to skip all phases and directly assign
4627 : * candidate_xid to oldest_nonremovable_xid during the
4628 : * RDT_GET_CANDIDATE_XID phase in such cases, this is unsafe. If users
4629 : * concurrently add tables to the subscription, the apply worker may not
4630 : * process invalidations in time. Consequently,
4631 : * HasSubscriptionTablesCached() might miss the new tables, leading to
4632 : * premature advancement of oldest_nonremovable_xid.
4633 : *
4634 : * Performing the check during RDT_WAIT_FOR_LOCAL_FLUSH is safe, as
4635 : * invalidations are guaranteed to be processed before applying changes
4636 : * from newly added tables while waiting for the local flush to reach
4637 : * remote_lsn.
4638 : *
4639 : * Additionally, even if we check for subscription tables during
4640 : * RDT_GET_CANDIDATE_XID, they might be dropped before reaching
4641 : * RDT_WAIT_FOR_LOCAL_FLUSH. Therefore, it's still necessary to verify
4642 : * subscription tables at this stage to prevent unnecessary tuple
4643 : * retention.
4644 : */
4645 236 : if (HasSubscriptionTablesCached() && !AllTablesyncsReady())
4646 : {
4647 : TimestampTz now;
4648 :
4649 64 : now = rdt_data->last_recv_time
4650 32 : ? rdt_data->last_recv_time : GetCurrentTimestamp();
4651 :
4652 : /*
4653 : * Record the time spent waiting for table sync, it is needed for the
4654 : * timeout check in should_stop_conflict_info_retention().
4655 : */
4656 32 : rdt_data->table_sync_wait_time =
4657 32 : TimestampDifferenceMilliseconds(rdt_data->candidate_xid_time, now);
4658 :
4659 32 : return;
4660 : }
4661 :
4662 : /*
4663 : * We don't need to maintain oldest_nonremovable_xid if we decide to stop
4664 : * retaining conflict information for this worker.
4665 : */
4666 204 : if (should_stop_conflict_info_retention(rdt_data))
4667 : {
4668 2 : rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
4669 2 : return;
4670 : }
4671 :
4672 : /*
4673 : * Update and check the remote flush position if we are applying changes
4674 : * in a loop. This is done at most once per WalWriterDelay to avoid
4675 : * performing costly operations in get_flush_position() too frequently
4676 : * during change application.
4677 : */
4678 282 : if (last_flushpos < rdt_data->remote_lsn && rdt_data->last_recv_time &&
4679 80 : TimestampDifferenceExceeds(rdt_data->flushpos_update_time,
4680 : rdt_data->last_recv_time, WalWriterDelay))
4681 : {
4682 : XLogRecPtr writepos;
4683 : XLogRecPtr flushpos;
4684 : bool have_pending_txes;
4685 :
4686 : /* Fetch the latest remote flush position */
4687 26 : get_flush_position(&writepos, &flushpos, &have_pending_txes);
4688 :
4689 26 : if (flushpos > last_flushpos)
4690 0 : last_flushpos = flushpos;
4691 :
4692 26 : rdt_data->flushpos_update_time = rdt_data->last_recv_time;
4693 : }
4694 :
4695 : /* Return to wait for the changes to be applied */
4696 202 : if (last_flushpos < rdt_data->remote_lsn)
4697 130 : return;
4698 :
4699 : /*
4700 : * Reaching this point implies should_stop_conflict_info_retention()
4701 : * returned false earlier, meaning that the most recent duration for
4702 : * advancing the non-removable transaction ID is within the
4703 : * max_retention_duration or max_retention_duration is set to 0.
4704 : *
4705 : * Therefore, if conflict info retention was previously stopped due to a
4706 : * timeout, it is now safe to resume retention.
4707 : */
4708 72 : if (!MySubscription->retentionactive)
4709 : {
4710 2 : rdt_data->phase = RDT_RESUME_CONFLICT_INFO_RETENTION;
4711 2 : return;
4712 : }
4713 :
4714 : /*
4715 : * Reaching here means the remote WAL position has been received, and all
4716 : * transactions up to that position on the publisher have been applied and
4717 : * flushed locally. So, we can advance the non-removable transaction ID.
4718 : */
4719 70 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
4720 70 : MyLogicalRepWorker->oldest_nonremovable_xid = rdt_data->candidate_xid;
4721 70 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
4722 :
4723 70 : elog(DEBUG2, "confirmed flush up to remote lsn %X/%08X: new oldest_nonremovable_xid %u",
4724 : LSN_FORMAT_ARGS(rdt_data->remote_lsn),
4725 : rdt_data->candidate_xid);
4726 :
4727 : /* Notify launcher to update the xmin of the conflict slot */
4728 70 : ApplyLauncherWakeup();
4729 :
4730 70 : reset_retention_data_fields(rdt_data);
4731 :
4732 : /* process the next phase */
4733 70 : process_rdt_phase_transition(rdt_data, false);
4734 : }
4735 :
4736 : /*
4737 : * Check whether conflict information retention should be stopped due to
4738 : * exceeding the maximum wait time (max_retention_duration).
4739 : *
4740 : * If retention should be stopped, return true. Otherwise, return false.
4741 : */
4742 : static bool
4743 3456 : should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
4744 : {
4745 : TimestampTz now;
4746 :
4747 : Assert(TransactionIdIsValid(rdt_data->candidate_xid));
4748 : Assert(rdt_data->phase == RDT_WAIT_FOR_PUBLISHER_STATUS ||
4749 : rdt_data->phase == RDT_WAIT_FOR_LOCAL_FLUSH);
4750 :
4751 3456 : if (!MySubscription->maxretention)
4752 3448 : return false;
4753 :
4754 : /*
4755 : * Use last_recv_time when applying changes in the loop to avoid
4756 : * unnecessary system time retrieval. If last_recv_time is not available,
4757 : * obtain the current timestamp.
4758 : */
4759 8 : now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4760 :
4761 : /*
4762 : * Return early if the wait time has not exceeded the configured maximum
4763 : * (max_retention_duration). Time spent waiting for table synchronization
4764 : * is excluded from this calculation, as it occurs infrequently.
4765 : */
4766 8 : if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
4767 8 : MySubscription->maxretention +
4768 8 : rdt_data->table_sync_wait_time))
4769 6 : return false;
4770 :
4771 2 : return true;
4772 : }
4773 :
4774 : /*
4775 : * Workhorse for the RDT_STOP_CONFLICT_INFO_RETENTION phase.
4776 : */
4777 : static void
4778 2 : stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
4779 : {
4780 : /* Stop retention if not yet */
4781 2 : if (MySubscription->retentionactive)
4782 : {
4783 : /*
4784 : * If the retention status cannot be updated (e.g., due to active
4785 : * transaction), skip further processing to avoid inconsistent
4786 : * retention behavior.
4787 : */
4788 2 : if (!update_retention_status(false))
4789 0 : return;
4790 :
4791 2 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
4792 2 : MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId;
4793 2 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
4794 :
4795 2 : ereport(LOG,
4796 : errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts",
4797 : MySubscription->name),
4798 : errdetail("Retention is stopped because the apply process has not caught up with the publisher within the configured max_retention_duration."));
4799 : }
4800 :
4801 : Assert(!TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid));
4802 :
4803 : /*
4804 : * If retention has been stopped, reset to the initial phase to retry
4805 : * resuming retention. This reset is required to recalculate the current
4806 : * wait time and resume retention if the time falls within
4807 : * max_retention_duration.
4808 : */
4809 2 : reset_retention_data_fields(rdt_data);
4810 : }
4811 :
4812 : /*
4813 : * Workhorse for the RDT_RESUME_CONFLICT_INFO_RETENTION phase.
4814 : */
4815 : static void
4816 2 : resume_conflict_info_retention(RetainDeadTuplesData *rdt_data)
4817 : {
4818 : /* We can't resume retention without updating retention status. */
4819 2 : if (!update_retention_status(true))
4820 0 : return;
4821 :
4822 2 : ereport(LOG,
4823 : errmsg("logical replication worker for subscription \"%s\" will resume retaining the information for detecting conflicts",
4824 : MySubscription->name),
4825 : MySubscription->maxretention
4826 : ? errdetail("Retention is re-enabled because the apply process has caught up with the publisher within the configured max_retention_duration.")
4827 : : errdetail("Retention is re-enabled because max_retention_duration has been set to unlimited."));
4828 :
4829 : /*
4830 : * Restart the worker to let the launcher initialize
4831 : * oldest_nonremovable_xid at startup.
4832 : *
4833 : * While it's technically possible to derive this value on-the-fly using
4834 : * the conflict detection slot's xmin, doing so risks a race condition:
4835 : * the launcher might clean slot.xmin just after retention resumes. This
4836 : * would make oldest_nonremovable_xid unreliable, especially during xid
4837 : * wraparound.
4838 : *
4839 : * Although this can be prevented by introducing heavy weight locking, the
4840 : * complexity it will bring doesn't seem worthwhile given how rarely
4841 : * retention is resumed.
4842 : */
4843 2 : apply_worker_exit();
4844 : }
4845 :
4846 : /*
4847 : * Updates pg_subscription.subretentionactive to the given value within a
4848 : * new transaction.
4849 : *
4850 : * If already inside an active transaction, skips the update and returns
4851 : * false.
4852 : *
4853 : * Returns true if the update is successfully performed.
4854 : */
4855 : static bool
4856 4 : update_retention_status(bool active)
4857 : {
4858 : /*
4859 : * Do not update the catalog during an active transaction. The transaction
4860 : * may be started during change application, leading to a possible
4861 : * rollback of catalog updates if the application fails subsequently.
4862 : */
4863 4 : if (IsTransactionState())
4864 0 : return false;
4865 :
4866 4 : StartTransactionCommand();
4867 :
4868 : /*
4869 : * Updating pg_subscription might involve TOAST table access, so ensure we
4870 : * have a valid snapshot.
4871 : */
4872 4 : PushActiveSnapshot(GetTransactionSnapshot());
4873 :
4874 : /* Update pg_subscription.subretentionactive */
4875 4 : UpdateDeadTupleRetentionStatus(MySubscription->oid, active);
4876 :
4877 4 : PopActiveSnapshot();
4878 4 : CommitTransactionCommand();
4879 :
4880 : /* Notify launcher to update the conflict slot */
4881 4 : ApplyLauncherWakeup();
4882 :
4883 4 : MySubscription->retentionactive = active;
4884 :
4885 4 : return true;
4886 : }
4887 :
4888 : /*
4889 : * Reset all data fields of RetainDeadTuplesData except those used to
4890 : * determine the timing for the next round of transaction ID advancement. We
4891 : * can even use flushpos_update_time in the next round to decide whether to get
4892 : * the latest flush position.
4893 : */
4894 : static void
4895 72 : reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
4896 : {
4897 72 : rdt_data->phase = RDT_GET_CANDIDATE_XID;
4898 72 : rdt_data->remote_lsn = InvalidXLogRecPtr;
4899 72 : rdt_data->remote_oldestxid = InvalidFullTransactionId;
4900 72 : rdt_data->remote_nextxid = InvalidFullTransactionId;
4901 72 : rdt_data->reply_time = 0;
4902 72 : rdt_data->remote_wait_for = InvalidFullTransactionId;
4903 72 : rdt_data->candidate_xid = InvalidTransactionId;
4904 72 : rdt_data->table_sync_wait_time = 0;
4905 72 : }
4906 :
4907 : /*
4908 : * Adjust the interval for advancing non-removable transaction IDs.
4909 : *
4910 : * If there is no activity on the node or retention has been stopped, we
4911 : * progressively double the interval used to advance non-removable transaction
4912 : * ID. This helps conserve CPU and network resources when there's little benefit
4913 : * to frequent updates.
4914 : *
4915 : * The interval is capped by the lowest of the following:
4916 : * - wal_receiver_status_interval (if set and retention is active),
4917 : * - a default maximum of 3 minutes,
4918 : * - max_retention_duration (if retention is active).
4919 : *
4920 : * This ensures the interval never exceeds the retention boundary, even if other
4921 : * limits are higher. Once activity resumes on the node and the retention is
4922 : * active, the interval is reset to lesser of 100ms and max_retention_duration,
4923 : * allowing timely advancement of non-removable transaction ID.
4924 : *
4925 : * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
4926 : * consider the other interval or a separate GUC if the need arises.
4927 : */
4928 : static void
4929 340 : adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
4930 : {
4931 340 : if (rdt_data->xid_advance_interval && !new_xid_found)
4932 0 : {
4933 0 : int max_interval = wal_receiver_status_interval
4934 0 : ? wal_receiver_status_interval * 1000
4935 0 : : MAX_XID_ADVANCE_INTERVAL;
4936 :
4937 : /*
4938 : * No new transaction ID has been assigned since the last check, so
4939 : * double the interval, but not beyond the maximum allowable value.
4940 : */
4941 0 : rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
4942 : max_interval);
4943 : }
4944 340 : else if (rdt_data->xid_advance_interval &&
4945 0 : !MySubscription->retentionactive)
4946 : {
4947 : /*
4948 : * Retention has been stopped, so double the interval-capped at a
4949 : * maximum of 3 minutes. The wal_receiver_status_interval is
4950 : * intentionally not used as a upper bound, since the likelihood of
4951 : * retention resuming is lower than that of general activity resuming.
4952 : */
4953 0 : rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
4954 : MAX_XID_ADVANCE_INTERVAL);
4955 : }
4956 : else
4957 : {
4958 : /*
4959 : * A new transaction ID was found or the interval is not yet
4960 : * initialized, so set the interval to the minimum value.
4961 : */
4962 340 : rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL;
4963 : }
4964 :
4965 : /*
4966 : * Ensure the wait time remains within the maximum retention time limit
4967 : * when retention is active.
4968 : */
4969 340 : if (MySubscription->retentionactive)
4970 338 : rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval,
4971 : MySubscription->maxretention);
4972 340 : }
4973 :
4974 : /*
4975 : * Exit routine for apply workers due to subscription parameter changes.
4976 : */
4977 : static void
4978 90 : apply_worker_exit(void)
4979 : {
4980 90 : if (am_parallel_apply_worker())
4981 : {
4982 : /*
4983 : * Don't stop the parallel apply worker as the leader will detect the
4984 : * subscription parameter change and restart logical replication later
4985 : * anyway. This also prevents the leader from reporting errors when
4986 : * trying to communicate with a stopped parallel apply worker, which
4987 : * would accidentally disable subscriptions if disable_on_error was
4988 : * set.
4989 : */
4990 0 : return;
4991 : }
4992 :
4993 : /*
4994 : * Reset the last-start time for this apply worker so that the launcher
4995 : * will restart it without waiting for wal_retrieve_retry_interval if the
4996 : * subscription is still active, and so that we won't leak that hash table
4997 : * entry if it isn't.
4998 : */
4999 90 : if (am_leader_apply_worker())
5000 90 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
5001 :
5002 90 : proc_exit(0);
5003 : }
5004 :
5005 : /*
5006 : * Reread subscription info if needed.
5007 : *
5008 : * For significant changes, we react by exiting the current process; a new
5009 : * one will be launched afterwards if needed.
5010 : */
5011 : void
5012 10508 : maybe_reread_subscription(void)
5013 : {
5014 : MemoryContext oldctx;
5015 : Subscription *newsub;
5016 10508 : bool started_tx = false;
5017 :
5018 : /* When cache state is valid there is nothing to do here. */
5019 10508 : if (MySubscriptionValid)
5020 10328 : return;
5021 :
5022 : /* This function might be called inside or outside of transaction. */
5023 180 : if (!IsTransactionState())
5024 : {
5025 170 : StartTransactionCommand();
5026 170 : started_tx = true;
5027 : }
5028 :
5029 : /* Ensure allocations in permanent context. */
5030 180 : oldctx = MemoryContextSwitchTo(ApplyContext);
5031 :
5032 180 : newsub = GetSubscription(MyLogicalRepWorker->subid, true);
5033 :
5034 : /*
5035 : * Exit if the subscription was removed. This normally should not happen
5036 : * as the worker gets killed during DROP SUBSCRIPTION.
5037 : */
5038 180 : if (!newsub)
5039 : {
5040 0 : ereport(LOG,
5041 : (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
5042 : MySubscription->name)));
5043 :
5044 : /* Ensure we remove no-longer-useful entry for worker's start time */
5045 0 : if (am_leader_apply_worker())
5046 0 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
5047 :
5048 0 : proc_exit(0);
5049 : }
5050 :
5051 : /* Exit if the subscription was disabled. */
5052 180 : if (!newsub->enabled)
5053 : {
5054 28 : ereport(LOG,
5055 : (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
5056 : MySubscription->name)));
5057 :
5058 28 : apply_worker_exit();
5059 : }
5060 :
5061 : /* !slotname should never happen when enabled is true. */
5062 : Assert(newsub->slotname);
5063 :
5064 : /* two-phase cannot be altered while the worker is running */
5065 : Assert(newsub->twophasestate == MySubscription->twophasestate);
5066 :
5067 : /*
5068 : * Exit if any parameter that affects the remote connection was changed.
5069 : * The launcher will start a new worker but note that the parallel apply
5070 : * worker won't restart if the streaming option's value is changed from
5071 : * 'parallel' to any other value or the server decides not to stream the
5072 : * in-progress transaction.
5073 : */
5074 152 : if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
5075 148 : strcmp(newsub->name, MySubscription->name) != 0 ||
5076 146 : strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
5077 146 : newsub->binary != MySubscription->binary ||
5078 134 : newsub->stream != MySubscription->stream ||
5079 124 : newsub->passwordrequired != MySubscription->passwordrequired ||
5080 124 : strcmp(newsub->origin, MySubscription->origin) != 0 ||
5081 122 : newsub->owner != MySubscription->owner ||
5082 120 : !equal(newsub->publications, MySubscription->publications))
5083 : {
5084 52 : if (am_parallel_apply_worker())
5085 0 : ereport(LOG,
5086 : (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
5087 : MySubscription->name)));
5088 : else
5089 52 : ereport(LOG,
5090 : (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
5091 : MySubscription->name)));
5092 :
5093 52 : apply_worker_exit();
5094 : }
5095 :
5096 : /*
5097 : * Exit if the subscription owner's superuser privileges have been
5098 : * revoked.
5099 : */
5100 100 : if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
5101 : {
5102 8 : if (am_parallel_apply_worker())
5103 0 : ereport(LOG,
5104 : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
5105 : MySubscription->name));
5106 : else
5107 8 : ereport(LOG,
5108 : errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
5109 : MySubscription->name));
5110 :
5111 8 : apply_worker_exit();
5112 : }
5113 :
5114 : /* Check for other changes that should never happen too. */
5115 92 : if (newsub->dbid != MySubscription->dbid)
5116 : {
5117 0 : elog(ERROR, "subscription %u changed unexpectedly",
5118 : MyLogicalRepWorker->subid);
5119 : }
5120 :
5121 : /* Clean old subscription info and switch to new one. */
5122 92 : FreeSubscription(MySubscription);
5123 92 : MySubscription = newsub;
5124 :
5125 92 : MemoryContextSwitchTo(oldctx);
5126 :
5127 : /* Change synchronous commit according to the user's wishes */
5128 92 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
5129 : PGC_BACKEND, PGC_S_OVERRIDE);
5130 :
5131 92 : if (started_tx)
5132 88 : CommitTransactionCommand();
5133 :
5134 92 : MySubscriptionValid = true;
5135 : }
5136 :
5137 : /*
5138 : * Callback from subscription syscache invalidation.
5139 : */
5140 : static void
5141 188 : subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
5142 : {
5143 188 : MySubscriptionValid = false;
5144 188 : }
5145 :
5146 : /*
5147 : * subxact_info_write
5148 : * Store information about subxacts for a toplevel transaction.
5149 : *
5150 : * For each subxact we store offset of its first change in the main file.
5151 : * The file is always over-written as a whole.
5152 : *
5153 : * XXX We should only store subxacts that were not aborted yet.
5154 : */
5155 : static void
5156 744 : subxact_info_write(Oid subid, TransactionId xid)
5157 : {
5158 : char path[MAXPGPATH];
5159 : Size len;
5160 : BufFile *fd;
5161 :
5162 : Assert(TransactionIdIsValid(xid));
5163 :
5164 : /* construct the subxact filename */
5165 744 : subxact_filename(path, subid, xid);
5166 :
5167 : /* Delete the subxacts file, if exists. */
5168 744 : if (subxact_data.nsubxacts == 0)
5169 : {
5170 580 : cleanup_subxact_info();
5171 580 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
5172 :
5173 580 : return;
5174 : }
5175 :
5176 : /*
5177 : * Create the subxact file if it not already created, otherwise open the
5178 : * existing file.
5179 : */
5180 164 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR,
5181 : true);
5182 164 : if (fd == NULL)
5183 16 : fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path);
5184 :
5185 164 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
5186 :
5187 : /* Write the subxact count and subxact info */
5188 164 : BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
5189 164 : BufFileWrite(fd, subxact_data.subxacts, len);
5190 :
5191 164 : BufFileClose(fd);
5192 :
5193 : /* free the memory allocated for subxact info */
5194 164 : cleanup_subxact_info();
5195 : }
5196 :
5197 : /*
5198 : * subxact_info_read
5199 : * Restore information about subxacts of a streamed transaction.
5200 : *
5201 : * Read information about subxacts into the structure subxact_data that can be
5202 : * used later.
5203 : */
5204 : static void
5205 688 : subxact_info_read(Oid subid, TransactionId xid)
5206 : {
5207 : char path[MAXPGPATH];
5208 : Size len;
5209 : BufFile *fd;
5210 : MemoryContext oldctx;
5211 :
5212 : Assert(!subxact_data.subxacts);
5213 : Assert(subxact_data.nsubxacts == 0);
5214 : Assert(subxact_data.nsubxacts_max == 0);
5215 :
5216 : /*
5217 : * If the subxact file doesn't exist that means we don't have any subxact
5218 : * info.
5219 : */
5220 688 : subxact_filename(path, subid, xid);
5221 688 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
5222 : true);
5223 688 : if (fd == NULL)
5224 530 : return;
5225 :
5226 : /* read number of subxact items */
5227 158 : BufFileReadExact(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
5228 :
5229 158 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
5230 :
5231 : /* we keep the maximum as a power of 2 */
5232 158 : subxact_data.nsubxacts_max = 1 << pg_ceil_log2_32(subxact_data.nsubxacts);
5233 :
5234 : /*
5235 : * Allocate subxact information in the logical streaming context. We need
5236 : * this information during the complete stream so that we can add the sub
5237 : * transaction info to this. On stream stop we will flush this information
5238 : * to the subxact file and reset the logical streaming context.
5239 : */
5240 158 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
5241 158 : subxact_data.subxacts = palloc(subxact_data.nsubxacts_max *
5242 : sizeof(SubXactInfo));
5243 158 : MemoryContextSwitchTo(oldctx);
5244 :
5245 158 : if (len > 0)
5246 158 : BufFileReadExact(fd, subxact_data.subxacts, len);
5247 :
5248 158 : BufFileClose(fd);
5249 : }
5250 :
5251 : /*
5252 : * subxact_info_add
5253 : * Add information about a subxact (offset in the main file).
5254 : */
5255 : static void
5256 205026 : subxact_info_add(TransactionId xid)
5257 : {
5258 205026 : SubXactInfo *subxacts = subxact_data.subxacts;
5259 : int64 i;
5260 :
5261 : /* We must have a valid top level stream xid and a stream fd. */
5262 : Assert(TransactionIdIsValid(stream_xid));
5263 : Assert(stream_fd != NULL);
5264 :
5265 : /*
5266 : * If the XID matches the toplevel transaction, we don't want to add it.
5267 : */
5268 205026 : if (stream_xid == xid)
5269 184778 : return;
5270 :
5271 : /*
5272 : * In most cases we're checking the same subxact as we've already seen in
5273 : * the last call, so make sure to ignore it (this change comes later).
5274 : */
5275 20248 : if (subxact_data.subxact_last == xid)
5276 20096 : return;
5277 :
5278 : /* OK, remember we're processing this XID. */
5279 152 : subxact_data.subxact_last = xid;
5280 :
5281 : /*
5282 : * Check if the transaction is already present in the array of subxact. We
5283 : * intentionally scan the array from the tail, because we're likely adding
5284 : * a change for the most recent subtransactions.
5285 : *
5286 : * XXX Can we rely on the subxact XIDs arriving in sorted order? That
5287 : * would allow us to use binary search here.
5288 : */
5289 190 : for (i = subxact_data.nsubxacts; i > 0; i--)
5290 : {
5291 : /* found, so we're done */
5292 152 : if (subxacts[i - 1].xid == xid)
5293 114 : return;
5294 : }
5295 :
5296 : /* This is a new subxact, so we need to add it to the array. */
5297 38 : if (subxact_data.nsubxacts == 0)
5298 : {
5299 : MemoryContext oldctx;
5300 :
5301 16 : subxact_data.nsubxacts_max = 128;
5302 :
5303 : /*
5304 : * Allocate this memory for subxacts in per-stream context, see
5305 : * subxact_info_read.
5306 : */
5307 16 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
5308 16 : subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
5309 16 : MemoryContextSwitchTo(oldctx);
5310 : }
5311 22 : else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
5312 : {
5313 20 : subxact_data.nsubxacts_max *= 2;
5314 20 : subxacts = repalloc(subxacts,
5315 20 : subxact_data.nsubxacts_max * sizeof(SubXactInfo));
5316 : }
5317 :
5318 38 : subxacts[subxact_data.nsubxacts].xid = xid;
5319 :
5320 : /*
5321 : * Get the current offset of the stream file and store it as offset of
5322 : * this subxact.
5323 : */
5324 38 : BufFileTell(stream_fd,
5325 38 : &subxacts[subxact_data.nsubxacts].fileno,
5326 38 : &subxacts[subxact_data.nsubxacts].offset);
5327 :
5328 38 : subxact_data.nsubxacts++;
5329 38 : subxact_data.subxacts = subxacts;
5330 : }
5331 :
5332 : /* format filename for file containing the info about subxacts */
5333 : static inline void
5334 1494 : subxact_filename(char *path, Oid subid, TransactionId xid)
5335 : {
5336 1494 : snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
5337 1494 : }
5338 :
5339 : /* format filename for file containing serialized changes */
5340 : static inline void
5341 876 : changes_filename(char *path, Oid subid, TransactionId xid)
5342 : {
5343 876 : snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
5344 876 : }
5345 :
5346 : /*
5347 : * stream_cleanup_files
5348 : * Cleanup files for a subscription / toplevel transaction.
5349 : *
5350 : * Remove files with serialized changes and subxact info for a particular
5351 : * toplevel transaction. Each subscription has a separate set of files
5352 : * for any toplevel transaction.
5353 : */
5354 : void
5355 62 : stream_cleanup_files(Oid subid, TransactionId xid)
5356 : {
5357 : char path[MAXPGPATH];
5358 :
5359 : /* Delete the changes file. */
5360 62 : changes_filename(path, subid, xid);
5361 62 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false);
5362 :
5363 : /* Delete the subxact file, if it exists. */
5364 62 : subxact_filename(path, subid, xid);
5365 62 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
5366 62 : }
5367 :
5368 : /*
5369 : * stream_open_file
5370 : * Open a file that we'll use to serialize changes for a toplevel
5371 : * transaction.
5372 : *
5373 : * Open a file for streamed changes from a toplevel transaction identified
5374 : * by stream_xid (global variable). If it's the first chunk of streamed
5375 : * changes for this transaction, create the buffile, otherwise open the
5376 : * previously created file.
5377 : */
5378 : static void
5379 726 : stream_open_file(Oid subid, TransactionId xid, bool first_segment)
5380 : {
5381 : char path[MAXPGPATH];
5382 : MemoryContext oldcxt;
5383 :
5384 : Assert(OidIsValid(subid));
5385 : Assert(TransactionIdIsValid(xid));
5386 : Assert(stream_fd == NULL);
5387 :
5388 :
5389 726 : changes_filename(path, subid, xid);
5390 726 : elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
5391 :
5392 : /*
5393 : * Create/open the buffiles under the logical streaming context so that we
5394 : * have those files until stream stop.
5395 : */
5396 726 : oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
5397 :
5398 : /*
5399 : * If this is the first streamed segment, create the changes file.
5400 : * Otherwise, just open the file for writing, in append mode.
5401 : */
5402 726 : if (first_segment)
5403 64 : stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset,
5404 : path);
5405 : else
5406 : {
5407 : /*
5408 : * Open the file and seek to the end of the file because we always
5409 : * append the changes file.
5410 : */
5411 662 : stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset,
5412 : path, O_RDWR, false);
5413 662 : BufFileSeek(stream_fd, 0, 0, SEEK_END);
5414 : }
5415 :
5416 726 : MemoryContextSwitchTo(oldcxt);
5417 726 : }
5418 :
5419 : /*
5420 : * stream_close_file
5421 : * Close the currently open file with streamed changes.
5422 : */
5423 : static void
5424 786 : stream_close_file(void)
5425 : {
5426 : Assert(stream_fd != NULL);
5427 :
5428 786 : BufFileClose(stream_fd);
5429 :
5430 786 : stream_fd = NULL;
5431 786 : }
5432 :
5433 : /*
5434 : * stream_write_change
5435 : * Serialize a change to a file for the current toplevel transaction.
5436 : *
5437 : * The change is serialized in a simple format, with length (not including
5438 : * the length), action code (identifying the message type) and message
5439 : * contents (without the subxact TransactionId value).
5440 : */
5441 : static void
5442 215108 : stream_write_change(char action, StringInfo s)
5443 : {
5444 : int len;
5445 :
5446 : Assert(stream_fd != NULL);
5447 :
5448 : /* total on-disk size, including the action type character */
5449 215108 : len = (s->len - s->cursor) + sizeof(char);
5450 :
5451 : /* first write the size */
5452 215108 : BufFileWrite(stream_fd, &len, sizeof(len));
5453 :
5454 : /* then the action */
5455 215108 : BufFileWrite(stream_fd, &action, sizeof(action));
5456 :
5457 : /* and finally the remaining part of the buffer (after the XID) */
5458 215108 : len = (s->len - s->cursor);
5459 :
5460 215108 : BufFileWrite(stream_fd, &s->data[s->cursor], len);
5461 215108 : }
5462 :
5463 : /*
5464 : * stream_open_and_write_change
5465 : * Serialize a message to a file for the given transaction.
5466 : *
5467 : * This function is similar to stream_write_change except that it will open the
5468 : * target file if not already before writing the message and close the file at
5469 : * the end.
5470 : */
5471 : static void
5472 10 : stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
5473 : {
5474 : Assert(!in_streamed_transaction);
5475 :
5476 10 : if (!stream_fd)
5477 10 : stream_start_internal(xid, false);
5478 :
5479 10 : stream_write_change(action, s);
5480 10 : stream_stop_internal(xid);
5481 10 : }
5482 :
5483 : /*
5484 : * Sets streaming options including replication slot name and origin start
5485 : * position. Workers need these options for logical replication.
5486 : */
5487 : void
5488 798 : set_stream_options(WalRcvStreamOptions *options,
5489 : char *slotname,
5490 : XLogRecPtr *origin_startpos)
5491 : {
5492 : int server_version;
5493 :
5494 798 : options->logical = true;
5495 798 : options->startpoint = *origin_startpos;
5496 798 : options->slotname = slotname;
5497 :
5498 798 : server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
5499 798 : options->proto.logical.proto_version =
5500 798 : server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
5501 : server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
5502 : server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
5503 : LOGICALREP_PROTO_VERSION_NUM;
5504 :
5505 798 : options->proto.logical.publication_names = MySubscription->publications;
5506 798 : options->proto.logical.binary = MySubscription->binary;
5507 :
5508 : /*
5509 : * Assign the appropriate option value for streaming option according to
5510 : * the 'streaming' mode and the publisher's ability to support that mode.
5511 : */
5512 798 : if (server_version >= 160000 &&
5513 798 : MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
5514 : {
5515 730 : options->proto.logical.streaming_str = "parallel";
5516 730 : MyLogicalRepWorker->parallel_apply = true;
5517 : }
5518 68 : else if (server_version >= 140000 &&
5519 68 : MySubscription->stream != LOGICALREP_STREAM_OFF)
5520 : {
5521 52 : options->proto.logical.streaming_str = "on";
5522 52 : MyLogicalRepWorker->parallel_apply = false;
5523 : }
5524 : else
5525 : {
5526 16 : options->proto.logical.streaming_str = NULL;
5527 16 : MyLogicalRepWorker->parallel_apply = false;
5528 : }
5529 :
5530 798 : options->proto.logical.twophase = false;
5531 798 : options->proto.logical.origin = pstrdup(MySubscription->origin);
5532 798 : }
5533 :
5534 : /*
5535 : * Cleanup the memory for subxacts and reset the related variables.
5536 : */
5537 : static inline void
5538 752 : cleanup_subxact_info()
5539 : {
5540 752 : if (subxact_data.subxacts)
5541 174 : pfree(subxact_data.subxacts);
5542 :
5543 752 : subxact_data.subxacts = NULL;
5544 752 : subxact_data.subxact_last = InvalidTransactionId;
5545 752 : subxact_data.nsubxacts = 0;
5546 752 : subxact_data.nsubxacts_max = 0;
5547 752 : }
5548 :
5549 : /*
5550 : * Common function to run the apply loop with error handling. Disable the
5551 : * subscription, if necessary.
5552 : *
5553 : * Note that we don't handle FATAL errors which are probably because
5554 : * of system resource error and are not repeatable.
5555 : */
5556 : void
5557 798 : start_apply(XLogRecPtr origin_startpos)
5558 : {
5559 798 : PG_TRY();
5560 : {
5561 798 : LogicalRepApplyLoop(origin_startpos);
5562 : }
5563 140 : PG_CATCH();
5564 : {
5565 : /*
5566 : * Reset the origin state to prevent the advancement of origin
5567 : * progress if we fail to apply. Otherwise, this will result in
5568 : * transaction loss as that transaction won't be sent again by the
5569 : * server.
5570 : */
5571 140 : replorigin_reset(0, (Datum) 0);
5572 :
5573 140 : if (MySubscription->disableonerr)
5574 6 : DisableSubscriptionAndExit();
5575 : else
5576 : {
5577 : /*
5578 : * Report the worker failed while applying changes. Abort the
5579 : * current transaction so that the stats message is sent in an
5580 : * idle state.
5581 : */
5582 134 : AbortOutOfAnyTransaction();
5583 134 : pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
5584 :
5585 134 : PG_RE_THROW();
5586 : }
5587 : }
5588 0 : PG_END_TRY();
5589 0 : }
5590 :
5591 : /*
5592 : * Runs the leader apply worker.
5593 : *
5594 : * It sets up replication origin, streaming options and then starts streaming.
5595 : */
5596 : static void
5597 510 : run_apply_worker()
5598 : {
5599 : char originname[NAMEDATALEN];
5600 510 : XLogRecPtr origin_startpos = InvalidXLogRecPtr;
5601 510 : char *slotname = NULL;
5602 : WalRcvStreamOptions options;
5603 : RepOriginId originid;
5604 : TimeLineID startpointTLI;
5605 : char *err;
5606 : bool must_use_password;
5607 :
5608 510 : slotname = MySubscription->slotname;
5609 :
5610 : /*
5611 : * This shouldn't happen if the subscription is enabled, but guard against
5612 : * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
5613 : * slot is NULL.)
5614 : */
5615 510 : if (!slotname)
5616 0 : ereport(ERROR,
5617 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5618 : errmsg("subscription has no replication slot set")));
5619 :
5620 : /* Setup replication origin tracking. */
5621 510 : ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
5622 : originname, sizeof(originname));
5623 510 : StartTransactionCommand();
5624 510 : originid = replorigin_by_name(originname, true);
5625 510 : if (!OidIsValid(originid))
5626 0 : originid = replorigin_create(originname);
5627 510 : replorigin_session_setup(originid, 0);
5628 510 : replorigin_session_origin = originid;
5629 510 : origin_startpos = replorigin_session_get_progress(false);
5630 510 : CommitTransactionCommand();
5631 :
5632 : /* Is the use of a password mandatory? */
5633 984 : must_use_password = MySubscription->passwordrequired &&
5634 474 : !MySubscription->ownersuperuser;
5635 :
5636 510 : LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
5637 : true, must_use_password,
5638 : MySubscription->name, &err);
5639 :
5640 484 : if (LogRepWorkerWalRcvConn == NULL)
5641 54 : ereport(ERROR,
5642 : (errcode(ERRCODE_CONNECTION_FAILURE),
5643 : errmsg("apply worker for subscription \"%s\" could not connect to the publisher: %s",
5644 : MySubscription->name, err)));
5645 :
5646 : /*
5647 : * We don't really use the output identify_system for anything but it does
5648 : * some initializations on the upstream so let's still call it.
5649 : */
5650 430 : (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
5651 :
5652 430 : set_apply_error_context_origin(originname);
5653 :
5654 430 : set_stream_options(&options, slotname, &origin_startpos);
5655 :
5656 : /*
5657 : * Even when the two_phase mode is requested by the user, it remains as
5658 : * the tri-state PENDING until all tablesyncs have reached READY state.
5659 : * Only then, can it become ENABLED.
5660 : *
5661 : * Note: If the subscription has no tables then leave the state as
5662 : * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
5663 : * work.
5664 : */
5665 462 : if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
5666 32 : AllTablesyncsReady())
5667 : {
5668 : /* Start streaming with two_phase enabled */
5669 18 : options.proto.logical.twophase = true;
5670 18 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
5671 :
5672 18 : StartTransactionCommand();
5673 :
5674 : /*
5675 : * Updating pg_subscription might involve TOAST table access, so
5676 : * ensure we have a valid snapshot.
5677 : */
5678 18 : PushActiveSnapshot(GetTransactionSnapshot());
5679 :
5680 18 : UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
5681 18 : MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
5682 18 : PopActiveSnapshot();
5683 18 : CommitTransactionCommand();
5684 : }
5685 : else
5686 : {
5687 412 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
5688 : }
5689 :
5690 430 : ereport(DEBUG1,
5691 : (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
5692 : MySubscription->name,
5693 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
5694 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
5695 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
5696 : "?")));
5697 :
5698 : /* Run the main loop. */
5699 430 : start_apply(origin_startpos);
5700 0 : }
5701 :
5702 : /*
5703 : * Common initialization for leader apply worker, parallel apply worker and
5704 : * tablesync worker.
5705 : *
5706 : * Initialize the database connection, in-memory subscription and necessary
5707 : * config options.
5708 : */
5709 : void
5710 1050 : InitializeLogRepWorker(void)
5711 : {
5712 : MemoryContext oldctx;
5713 :
5714 : /* Run as replica session replication role. */
5715 1050 : SetConfigOption("session_replication_role", "replica",
5716 : PGC_SUSET, PGC_S_OVERRIDE);
5717 :
5718 : /* Connect to our database. */
5719 1050 : BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
5720 1050 : MyLogicalRepWorker->userid,
5721 : 0);
5722 :
5723 : /*
5724 : * Set always-secure search path, so malicious users can't redirect user
5725 : * code (e.g. pg_index.indexprs).
5726 : */
5727 1044 : SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
5728 :
5729 : /* Load the subscription into persistent memory context. */
5730 1044 : ApplyContext = AllocSetContextCreate(TopMemoryContext,
5731 : "ApplyContext",
5732 : ALLOCSET_DEFAULT_SIZES);
5733 1044 : StartTransactionCommand();
5734 1044 : oldctx = MemoryContextSwitchTo(ApplyContext);
5735 :
5736 : /*
5737 : * Lock the subscription to prevent it from being concurrently dropped,
5738 : * then re-verify its existence. After the initialization, the worker will
5739 : * be terminated gracefully if the subscription is dropped.
5740 : */
5741 1044 : LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, 0,
5742 : AccessShareLock);
5743 1044 : MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
5744 1044 : if (!MySubscription)
5745 : {
5746 122 : ereport(LOG,
5747 : (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
5748 : MyLogicalRepWorker->subid)));
5749 :
5750 : /* Ensure we remove no-longer-useful entry for worker's start time */
5751 122 : if (am_leader_apply_worker())
5752 122 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
5753 :
5754 122 : proc_exit(0);
5755 : }
5756 :
5757 922 : MySubscriptionValid = true;
5758 922 : MemoryContextSwitchTo(oldctx);
5759 :
5760 922 : if (!MySubscription->enabled)
5761 : {
5762 0 : ereport(LOG,
5763 : (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
5764 : MySubscription->name)));
5765 :
5766 0 : apply_worker_exit();
5767 : }
5768 :
5769 : /*
5770 : * Restart the worker if retain_dead_tuples was enabled during startup.
5771 : *
5772 : * At this point, the replication slot used for conflict detection might
5773 : * not exist yet, or could be dropped soon if the launcher perceives
5774 : * retain_dead_tuples as disabled. To avoid unnecessary tracking of
5775 : * oldest_nonremovable_xid when the slot is absent or at risk of being
5776 : * dropped, a restart is initiated.
5777 : *
5778 : * The oldest_nonremovable_xid should be initialized only when the
5779 : * subscription's retention is active before launching the worker. See
5780 : * logicalrep_worker_launch.
5781 : */
5782 922 : if (am_leader_apply_worker() &&
5783 510 : MySubscription->retaindeadtuples &&
5784 28 : MySubscription->retentionactive &&
5785 28 : !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid))
5786 : {
5787 0 : ereport(LOG,
5788 : errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup",
5789 : MySubscription->name, "retain_dead_tuples"));
5790 :
5791 0 : apply_worker_exit();
5792 : }
5793 :
5794 : /* Setup synchronous commit according to the user's wishes */
5795 922 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
5796 : PGC_BACKEND, PGC_S_OVERRIDE);
5797 :
5798 : /*
5799 : * Keep us informed about subscription or role changes. Note that the
5800 : * role's superuser privilege can be revoked.
5801 : */
5802 922 : CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
5803 : subscription_change_cb,
5804 : (Datum) 0);
5805 :
5806 922 : CacheRegisterSyscacheCallback(AUTHOID,
5807 : subscription_change_cb,
5808 : (Datum) 0);
5809 :
5810 922 : if (am_tablesync_worker())
5811 392 : ereport(LOG,
5812 : (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
5813 : MySubscription->name,
5814 : get_rel_name(MyLogicalRepWorker->relid))));
5815 : else
5816 530 : ereport(LOG,
5817 : (errmsg("logical replication apply worker for subscription \"%s\" has started",
5818 : MySubscription->name)));
5819 :
5820 922 : CommitTransactionCommand();
5821 922 : }
5822 :
5823 : /*
5824 : * Reset the origin state.
5825 : */
5826 : static void
5827 1042 : replorigin_reset(int code, Datum arg)
5828 : {
5829 1042 : replorigin_session_origin = InvalidRepOriginId;
5830 1042 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
5831 1042 : replorigin_session_origin_timestamp = 0;
5832 1042 : }
5833 :
5834 : /* Common function to setup the leader apply or tablesync worker. */
5835 : void
5836 1030 : SetupApplyOrSyncWorker(int worker_slot)
5837 : {
5838 : /* Attach to slot */
5839 1030 : logicalrep_worker_attach(worker_slot);
5840 :
5841 : Assert(am_tablesync_worker() || am_leader_apply_worker());
5842 :
5843 : /* Setup signal handling */
5844 1030 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
5845 1030 : pqsignal(SIGTERM, die);
5846 1030 : BackgroundWorkerUnblockSignals();
5847 :
5848 : /*
5849 : * We don't currently need any ResourceOwner in a walreceiver process, but
5850 : * if we did, we could call CreateAuxProcessResourceOwner here.
5851 : */
5852 :
5853 : /* Initialise stats to a sanish value */
5854 1030 : MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
5855 1030 : MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
5856 :
5857 : /* Load the libpq-specific functions */
5858 1030 : load_file("libpqwalreceiver", false);
5859 :
5860 1030 : InitializeLogRepWorker();
5861 :
5862 : /*
5863 : * Register a callback to reset the origin state before aborting any
5864 : * pending transaction during shutdown (see ShutdownPostgres()). This will
5865 : * avoid origin advancement for an in-complete transaction which could
5866 : * otherwise lead to its loss as such a transaction won't be sent by the
5867 : * server again.
5868 : *
5869 : * Note that even a LOG or DEBUG statement placed after setting the origin
5870 : * state may process a shutdown signal before committing the current apply
5871 : * operation. So, it is important to register such a callback here.
5872 : */
5873 902 : before_shmem_exit(replorigin_reset, (Datum) 0);
5874 :
5875 : /* Connect to the origin and start the replication. */
5876 902 : elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
5877 : MySubscription->conninfo);
5878 :
5879 : /*
5880 : * Setup callback for syscache so that we know when something changes in
5881 : * the subscription relation state.
5882 : */
5883 902 : CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
5884 : InvalidateSyncingRelStates,
5885 : (Datum) 0);
5886 902 : }
5887 :
5888 : /* Logical Replication Apply worker entry point */
5889 : void
5890 638 : ApplyWorkerMain(Datum main_arg)
5891 : {
5892 638 : int worker_slot = DatumGetInt32(main_arg);
5893 :
5894 638 : InitializingApplyWorker = true;
5895 :
5896 638 : SetupApplyOrSyncWorker(worker_slot);
5897 :
5898 510 : InitializingApplyWorker = false;
5899 :
5900 510 : run_apply_worker();
5901 :
5902 0 : proc_exit(0);
5903 : }
5904 :
5905 : /*
5906 : * After error recovery, disable the subscription in a new transaction
5907 : * and exit cleanly.
5908 : */
5909 : void
5910 8 : DisableSubscriptionAndExit(void)
5911 : {
5912 : /*
5913 : * Emit the error message, and recover from the error state to an idle
5914 : * state
5915 : */
5916 8 : HOLD_INTERRUPTS();
5917 :
5918 8 : EmitErrorReport();
5919 8 : AbortOutOfAnyTransaction();
5920 8 : FlushErrorState();
5921 :
5922 8 : RESUME_INTERRUPTS();
5923 :
5924 : /* Report the worker failed during either table synchronization or apply */
5925 8 : pgstat_report_subscription_error(MyLogicalRepWorker->subid,
5926 8 : !am_tablesync_worker());
5927 :
5928 : /* Disable the subscription */
5929 8 : StartTransactionCommand();
5930 :
5931 : /*
5932 : * Updating pg_subscription might involve TOAST table access, so ensure we
5933 : * have a valid snapshot.
5934 : */
5935 8 : PushActiveSnapshot(GetTransactionSnapshot());
5936 :
5937 8 : DisableSubscription(MySubscription->oid);
5938 8 : PopActiveSnapshot();
5939 8 : CommitTransactionCommand();
5940 :
5941 : /* Ensure we remove no-longer-useful entry for worker's start time */
5942 8 : if (am_leader_apply_worker())
5943 6 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
5944 :
5945 : /* Notify the subscription has been disabled and exit */
5946 8 : ereport(LOG,
5947 : errmsg("subscription \"%s\" has been disabled because of an error",
5948 : MySubscription->name));
5949 :
5950 : /*
5951 : * Skip the track_commit_timestamp check when disabling the worker due to
5952 : * an error, as verifying commit timestamps is unnecessary in this
5953 : * context.
5954 : */
5955 8 : CheckSubDeadTupleRetention(false, true, WARNING,
5956 8 : MySubscription->retaindeadtuples,
5957 8 : MySubscription->retentionactive, false);
5958 :
5959 8 : proc_exit(0);
5960 : }
5961 :
5962 : /*
5963 : * Is current process a logical replication worker?
5964 : */
5965 : bool
5966 4046 : IsLogicalWorker(void)
5967 : {
5968 4046 : return MyLogicalRepWorker != NULL;
5969 : }
5970 :
5971 : /*
5972 : * Is current process a logical replication parallel apply worker?
5973 : */
5974 : bool
5975 2774 : IsLogicalParallelApplyWorker(void)
5976 : {
5977 2774 : return IsLogicalWorker() && am_parallel_apply_worker();
5978 : }
5979 :
5980 : /*
5981 : * Start skipping changes of the transaction if the given LSN matches the
5982 : * LSN specified by subscription's skiplsn.
5983 : */
5984 : static void
5985 1044 : maybe_start_skipping_changes(XLogRecPtr finish_lsn)
5986 : {
5987 : Assert(!is_skipping_changes());
5988 : Assert(!in_remote_transaction);
5989 : Assert(!in_streamed_transaction);
5990 :
5991 : /*
5992 : * Quick return if it's not requested to skip this transaction. This
5993 : * function is called for every remote transaction and we assume that
5994 : * skipping the transaction is not used often.
5995 : */
5996 1044 : if (likely(XLogRecPtrIsInvalid(MySubscription->skiplsn) ||
5997 : MySubscription->skiplsn != finish_lsn))
5998 1038 : return;
5999 :
6000 : /* Start skipping all changes of this transaction */
6001 6 : skip_xact_finish_lsn = finish_lsn;
6002 :
6003 6 : ereport(LOG,
6004 : errmsg("logical replication starts skipping transaction at LSN %X/%08X",
6005 : LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
6006 : }
6007 :
6008 : /*
6009 : * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
6010 : */
6011 : static void
6012 54 : stop_skipping_changes(void)
6013 : {
6014 54 : if (!is_skipping_changes())
6015 48 : return;
6016 :
6017 6 : ereport(LOG,
6018 : errmsg("logical replication completed skipping transaction at LSN %X/%08X",
6019 : LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
6020 :
6021 : /* Stop skipping changes */
6022 6 : skip_xact_finish_lsn = InvalidXLogRecPtr;
6023 : }
6024 :
6025 : /*
6026 : * Clear subskiplsn of pg_subscription catalog.
6027 : *
6028 : * finish_lsn is the transaction's finish LSN that is used to check if the
6029 : * subskiplsn matches it. If not matched, we raise a warning when clearing the
6030 : * subskiplsn in order to inform users for cases e.g., where the user mistakenly
6031 : * specified the wrong subskiplsn.
6032 : */
6033 : static void
6034 1054 : clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
6035 : {
6036 : Relation rel;
6037 : Form_pg_subscription subform;
6038 : HeapTuple tup;
6039 1054 : XLogRecPtr myskiplsn = MySubscription->skiplsn;
6040 1054 : bool started_tx = false;
6041 :
6042 1054 : if (likely(XLogRecPtrIsInvalid(myskiplsn)) || am_parallel_apply_worker())
6043 1048 : return;
6044 :
6045 6 : if (!IsTransactionState())
6046 : {
6047 2 : StartTransactionCommand();
6048 2 : started_tx = true;
6049 : }
6050 :
6051 : /*
6052 : * Updating pg_subscription might involve TOAST table access, so ensure we
6053 : * have a valid snapshot.
6054 : */
6055 6 : PushActiveSnapshot(GetTransactionSnapshot());
6056 :
6057 : /*
6058 : * Protect subskiplsn of pg_subscription from being concurrently updated
6059 : * while clearing it.
6060 : */
6061 6 : LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
6062 : AccessShareLock);
6063 :
6064 6 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
6065 :
6066 : /* Fetch the existing tuple. */
6067 6 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
6068 : ObjectIdGetDatum(MySubscription->oid));
6069 :
6070 6 : if (!HeapTupleIsValid(tup))
6071 0 : elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
6072 :
6073 6 : subform = (Form_pg_subscription) GETSTRUCT(tup);
6074 :
6075 : /*
6076 : * Clear the subskiplsn. If the user has already changed subskiplsn before
6077 : * clearing it we don't update the catalog and the replication origin
6078 : * state won't get advanced. So in the worst case, if the server crashes
6079 : * before sending an acknowledgment of the flush position the transaction
6080 : * will be sent again and the user needs to set subskiplsn again. We can
6081 : * reduce the possibility by logging a replication origin WAL record to
6082 : * advance the origin LSN instead but there is no way to advance the
6083 : * origin timestamp and it doesn't seem to be worth doing anything about
6084 : * it since it's a very rare case.
6085 : */
6086 6 : if (subform->subskiplsn == myskiplsn)
6087 : {
6088 : bool nulls[Natts_pg_subscription];
6089 : bool replaces[Natts_pg_subscription];
6090 : Datum values[Natts_pg_subscription];
6091 :
6092 6 : memset(values, 0, sizeof(values));
6093 6 : memset(nulls, false, sizeof(nulls));
6094 6 : memset(replaces, false, sizeof(replaces));
6095 :
6096 : /* reset subskiplsn */
6097 6 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
6098 6 : replaces[Anum_pg_subscription_subskiplsn - 1] = true;
6099 :
6100 6 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
6101 : replaces);
6102 6 : CatalogTupleUpdate(rel, &tup->t_self, tup);
6103 :
6104 6 : if (myskiplsn != finish_lsn)
6105 0 : ereport(WARNING,
6106 : errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
6107 : errdetail("Remote transaction's finish WAL location (LSN) %X/%08X did not match skip-LSN %X/%08X.",
6108 : LSN_FORMAT_ARGS(finish_lsn),
6109 : LSN_FORMAT_ARGS(myskiplsn)));
6110 : }
6111 :
6112 6 : heap_freetuple(tup);
6113 6 : table_close(rel, NoLock);
6114 :
6115 6 : PopActiveSnapshot();
6116 :
6117 6 : if (started_tx)
6118 2 : CommitTransactionCommand();
6119 : }
6120 :
6121 : /* Error callback to give more context info about the change being applied */
6122 : void
6123 5810 : apply_error_callback(void *arg)
6124 : {
6125 5810 : ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
6126 :
6127 5810 : if (apply_error_callback_arg.command == 0)
6128 4068 : return;
6129 :
6130 : Assert(errarg->origin_name);
6131 :
6132 1742 : if (errarg->rel == NULL)
6133 : {
6134 650 : if (!TransactionIdIsValid(errarg->remote_xid))
6135 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
6136 : errarg->origin_name,
6137 : logicalrep_message_type(errarg->command));
6138 650 : else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
6139 518 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
6140 : errarg->origin_name,
6141 : logicalrep_message_type(errarg->command),
6142 : errarg->remote_xid);
6143 : else
6144 264 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%08X",
6145 : errarg->origin_name,
6146 : logicalrep_message_type(errarg->command),
6147 : errarg->remote_xid,
6148 132 : LSN_FORMAT_ARGS(errarg->finish_lsn));
6149 : }
6150 : else
6151 : {
6152 1092 : if (errarg->remote_attnum < 0)
6153 : {
6154 1092 : if (XLogRecPtrIsInvalid(errarg->finish_lsn))
6155 1928 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
6156 : errarg->origin_name,
6157 : logicalrep_message_type(errarg->command),
6158 964 : errarg->rel->remoterel.nspname,
6159 964 : errarg->rel->remoterel.relname,
6160 : errarg->remote_xid);
6161 : else
6162 256 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%08X",
6163 : errarg->origin_name,
6164 : logicalrep_message_type(errarg->command),
6165 128 : errarg->rel->remoterel.nspname,
6166 128 : errarg->rel->remoterel.relname,
6167 : errarg->remote_xid,
6168 128 : LSN_FORMAT_ARGS(errarg->finish_lsn));
6169 : }
6170 : else
6171 : {
6172 0 : if (XLogRecPtrIsInvalid(errarg->finish_lsn))
6173 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
6174 : errarg->origin_name,
6175 : logicalrep_message_type(errarg->command),
6176 0 : errarg->rel->remoterel.nspname,
6177 0 : errarg->rel->remoterel.relname,
6178 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
6179 : errarg->remote_xid);
6180 : else
6181 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%08X",
6182 : errarg->origin_name,
6183 : logicalrep_message_type(errarg->command),
6184 0 : errarg->rel->remoterel.nspname,
6185 0 : errarg->rel->remoterel.relname,
6186 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
6187 : errarg->remote_xid,
6188 0 : LSN_FORMAT_ARGS(errarg->finish_lsn));
6189 : }
6190 : }
6191 : }
6192 :
6193 : /* Set transaction information of apply error callback */
6194 : static inline void
6195 5746 : set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
6196 : {
6197 5746 : apply_error_callback_arg.remote_xid = xid;
6198 5746 : apply_error_callback_arg.finish_lsn = lsn;
6199 5746 : }
6200 :
6201 : /* Reset all information of apply error callback */
6202 : static inline void
6203 2824 : reset_apply_error_context_info(void)
6204 : {
6205 2824 : apply_error_callback_arg.command = 0;
6206 2824 : apply_error_callback_arg.rel = NULL;
6207 2824 : apply_error_callback_arg.remote_attnum = -1;
6208 2824 : set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
6209 2824 : }
6210 :
6211 : /*
6212 : * Request wakeup of the workers for the given subscription OID
6213 : * at commit of the current transaction.
6214 : *
6215 : * This is used to ensure that the workers process assorted changes
6216 : * as soon as possible.
6217 : */
6218 : void
6219 440 : LogicalRepWorkersWakeupAtCommit(Oid subid)
6220 : {
6221 : MemoryContext oldcxt;
6222 :
6223 440 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
6224 440 : on_commit_wakeup_workers_subids =
6225 440 : list_append_unique_oid(on_commit_wakeup_workers_subids, subid);
6226 440 : MemoryContextSwitchTo(oldcxt);
6227 440 : }
6228 :
6229 : /*
6230 : * Wake up the workers of any subscriptions that were changed in this xact.
6231 : */
6232 : void
6233 1114696 : AtEOXact_LogicalRepWorkers(bool isCommit)
6234 : {
6235 1114696 : if (isCommit && on_commit_wakeup_workers_subids != NIL)
6236 : {
6237 : ListCell *lc;
6238 :
6239 430 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
6240 860 : foreach(lc, on_commit_wakeup_workers_subids)
6241 : {
6242 430 : Oid subid = lfirst_oid(lc);
6243 : List *workers;
6244 : ListCell *lc2;
6245 :
6246 430 : workers = logicalrep_workers_find(subid, true, false);
6247 562 : foreach(lc2, workers)
6248 : {
6249 132 : LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
6250 :
6251 132 : logicalrep_worker_wakeup_ptr(worker);
6252 : }
6253 : }
6254 430 : LWLockRelease(LogicalRepWorkerLock);
6255 : }
6256 :
6257 : /* The List storage will be reclaimed automatically in xact cleanup. */
6258 1114696 : on_commit_wakeup_workers_subids = NIL;
6259 1114696 : }
6260 :
6261 : /*
6262 : * Allocate the origin name in long-lived context for error context message.
6263 : */
6264 : void
6265 818 : set_apply_error_context_origin(char *originname)
6266 : {
6267 818 : apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
6268 : originname);
6269 818 : }
6270 :
6271 : /*
6272 : * Return the action to be taken for the given transaction. See
6273 : * TransApplyAction for information on each of the actions.
6274 : *
6275 : * *winfo is assigned to the destination parallel worker info when the leader
6276 : * apply worker has to pass all the transaction's changes to the parallel
6277 : * apply worker.
6278 : */
6279 : static TransApplyAction
6280 643008 : get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
6281 : {
6282 643008 : *winfo = NULL;
6283 :
6284 643008 : if (am_parallel_apply_worker())
6285 : {
6286 128388 : return TRANS_PARALLEL_APPLY;
6287 : }
6288 :
6289 : /*
6290 : * If we are processing this transaction using a parallel apply worker
6291 : * then either we send the changes to the parallel worker or if the worker
6292 : * is busy then serialize the changes to the file which will later be
6293 : * processed by the parallel worker.
6294 : */
6295 514620 : *winfo = pa_find_worker(xid);
6296 :
6297 514620 : if (*winfo && (*winfo)->serialize_changes)
6298 : {
6299 10074 : return TRANS_LEADER_PARTIAL_SERIALIZE;
6300 : }
6301 504546 : else if (*winfo)
6302 : {
6303 127776 : return TRANS_LEADER_SEND_TO_PARALLEL;
6304 : }
6305 :
6306 : /*
6307 : * If there is no parallel worker involved to process this transaction
6308 : * then we either directly apply the change or serialize it to a file
6309 : * which will later be applied when the transaction finish message is
6310 : * processed.
6311 : */
6312 376770 : else if (in_streamed_transaction)
6313 : {
6314 206398 : return TRANS_LEADER_SERIALIZE;
6315 : }
6316 : else
6317 : {
6318 170372 : return TRANS_LEADER_APPLY;
6319 : }
6320 : }
|