Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * worker.c
3 : * PostgreSQL logical replication worker (apply)
4 : *
5 : * Copyright (c) 2016-2026, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/worker.c
9 : *
10 : * NOTES
11 : * This file contains the worker which applies logical changes as they come
12 : * from remote logical replication stream.
13 : *
14 : * The main worker (apply) is started by logical replication worker
15 : * launcher for every enabled subscription in a database. It uses
16 : * walsender protocol to communicate with publisher.
17 : *
18 : * This module includes server facing code and shares libpqwalreceiver
19 : * module with walreceiver for providing the libpq specific functionality.
20 : *
21 : *
22 : * STREAMED TRANSACTIONS
23 : * ---------------------
24 : * Streamed transactions (large transactions exceeding a memory limit on the
25 : * upstream) are applied using one of two approaches:
26 : *
27 : * 1) Write to temporary files and apply when the final commit arrives
28 : *
29 : * This approach is used when the user has set the subscription's streaming
30 : * option as on.
31 : *
32 : * Unlike the regular (non-streamed) case, handling streamed transactions has
33 : * to handle aborts of both the toplevel transaction and subtransactions. This
34 : * is achieved by tracking offsets for subtransactions, which is then used
35 : * to truncate the file with serialized changes.
36 : *
37 : * The files are placed in tmp file directory by default, and the filenames
38 : * include both the XID of the toplevel transaction and OID of the
39 : * subscription. This is necessary so that different workers processing a
40 : * remote transaction with the same XID doesn't interfere.
41 : *
42 : * We use BufFiles instead of using normal temporary files because (a) the
43 : * BufFile infrastructure supports temporary files that exceed the OS file size
44 : * limit, (b) provides a way for automatic clean up on the error and (c) provides
45 : * a way to survive these files across local transactions and allow to open and
46 : * close at stream start and close. We decided to use FileSet
47 : * infrastructure as without that it deletes the files on the closure of the
48 : * file and if we decide to keep stream files open across the start/stop stream
49 : * then it will consume a lot of memory (more than 8K for each BufFile and
50 : * there could be multiple such BufFiles as the subscriber could receive
51 : * multiple start/stop streams for different transactions before getting the
52 : * commit). Moreover, if we don't use FileSet then we also need to invent
53 : * a new way to pass filenames to BufFile APIs so that we are allowed to open
54 : * the file we desired across multiple stream-open calls for the same
55 : * transaction.
56 : *
57 : * 2) Parallel apply workers.
58 : *
59 : * This approach is used when the user has set the subscription's streaming
60 : * option as parallel. See logical/applyparallelworker.c for information about
61 : * this approach.
62 : *
63 : * TWO_PHASE TRANSACTIONS
64 : * ----------------------
65 : * Two phase transactions are replayed at prepare and then committed or
66 : * rolled back at commit prepared and rollback prepared respectively. It is
67 : * possible to have a prepared transaction that arrives at the apply worker
68 : * when the tablesync is busy doing the initial copy. In this case, the apply
69 : * worker skips all the prepared operations [e.g. inserts] while the tablesync
70 : * is still busy (see the condition of should_apply_changes_for_rel). The
71 : * tablesync worker might not get such a prepared transaction because say it
72 : * was prior to the initial consistent point but might have got some later
73 : * commits. Now, the tablesync worker will exit without doing anything for the
74 : * prepared transaction skipped by the apply worker as the sync location for it
75 : * will be already ahead of the apply worker's current location. This would lead
76 : * to an "empty prepare", because later when the apply worker does the commit
77 : * prepare, there is nothing in it (the inserts were skipped earlier).
78 : *
79 : * To avoid this, and similar prepare confusions the subscription's two_phase
80 : * commit is enabled only after the initial sync is over. The two_phase option
81 : * has been implemented as a tri-state with values DISABLED, PENDING, and
82 : * ENABLED.
83 : *
84 : * Even if the user specifies they want a subscription with two_phase = on,
85 : * internally it will start with a tri-state of PENDING which only becomes
86 : * ENABLED after all tablesync initializations are completed - i.e. when all
87 : * tablesync workers have reached their READY state. In other words, the value
88 : * PENDING is only a temporary state for subscription start-up.
89 : *
90 : * Until the two_phase is properly available (ENABLED) the subscription will
91 : * behave as if two_phase = off. When the apply worker detects that all
92 : * tablesyncs have become READY (while the tri-state was PENDING) it will
93 : * restart the apply worker process. This happens in
94 : * ProcessSyncingTablesForApply.
95 : *
96 : * When the (re-started) apply worker finds that all tablesyncs are READY for a
97 : * two_phase tri-state of PENDING it start streaming messages with the
98 : * two_phase option which in turn enables the decoding of two-phase commits at
99 : * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
100 : * Now, it is possible that during the time we have not enabled two_phase, the
101 : * publisher (replication server) would have skipped some prepares but we
102 : * ensure that such prepares are sent along with commit prepare, see
103 : * ReorderBufferFinishPrepared.
104 : *
105 : * If the subscription has no tables then a two_phase tri-state PENDING is
106 : * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
107 : * PUBLICATION which might otherwise be disallowed (see below).
108 : *
109 : * If ever a user needs to be aware of the tri-state value, they can fetch it
110 : * from the pg_subscription catalog (see column subtwophasestate).
111 : *
112 : * Finally, to avoid problems mentioned in previous paragraphs from any
113 : * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
114 : * to 'off' and then again back to 'on') there is a restriction for
115 : * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
116 : * the two_phase tri-state is ENABLED, except when copy_data = false.
117 : *
118 : * We can get prepare of the same GID more than once for the genuine cases
119 : * where we have defined multiple subscriptions for publications on the same
120 : * server and prepared transaction has operations on tables subscribed to those
121 : * subscriptions. For such cases, if we use the GID sent by publisher one of
122 : * the prepares will be successful and others will fail, in which case the
123 : * server will send them again. Now, this can lead to a deadlock if user has
124 : * set synchronous_standby_names for all the subscriptions on subscriber. To
125 : * avoid such deadlocks, we generate a unique GID (consisting of the
126 : * subscription oid and the xid of the prepared transaction) for each prepare
127 : * transaction on the subscriber.
128 : *
129 : * FAILOVER
130 : * ----------------------
131 : * The logical slot on the primary can be synced to the standby by specifying
132 : * failover = true when creating the subscription. Enabling failover allows us
133 : * to smoothly transition to the promoted standby, ensuring that we can
134 : * subscribe to the new primary without losing any data.
135 : *
136 : * RETAIN DEAD TUPLES
137 : * ----------------------
138 : * Each apply worker that enabled retain_dead_tuples option maintains a
139 : * non-removable transaction ID (oldest_nonremovable_xid) in shared memory to
140 : * prevent dead rows from being removed prematurely when the apply worker still
141 : * needs them to detect update_deleted conflicts. Additionally, this helps to
142 : * retain the required commit_ts module information, which further helps to
143 : * detect update_origin_differs and delete_origin_differs conflicts reliably, as
144 : * otherwise, vacuum freeze could remove the required information.
145 : *
146 : * The logical replication launcher manages an internal replication slot named
147 : * "pg_conflict_detection". It asynchronously aggregates the non-removable
148 : * transaction ID from all apply workers to determine the appropriate xmin for
149 : * the slot, thereby retaining necessary tuples.
150 : *
151 : * The non-removable transaction ID in the apply worker is advanced to the
152 : * oldest running transaction ID once all concurrent transactions on the
153 : * publisher have been applied and flushed locally. The process involves:
154 : *
155 : * - RDT_GET_CANDIDATE_XID:
156 : * Call GetOldestActiveTransactionId() to take oldestRunningXid as the
157 : * candidate xid.
158 : *
159 : * - RDT_REQUEST_PUBLISHER_STATUS:
160 : * Send a message to the walsender requesting the publisher status, which
161 : * includes the latest WAL write position and information about transactions
162 : * that are in the commit phase.
163 : *
164 : * - RDT_WAIT_FOR_PUBLISHER_STATUS:
165 : * Wait for the status from the walsender. After receiving the first status,
166 : * do not proceed if there are concurrent remote transactions that are still
167 : * in the commit phase. These transactions might have been assigned an
168 : * earlier commit timestamp but have not yet written the commit WAL record.
169 : * Continue to request the publisher status (RDT_REQUEST_PUBLISHER_STATUS)
170 : * until all these transactions have completed.
171 : *
172 : * - RDT_WAIT_FOR_LOCAL_FLUSH:
173 : * Advance the non-removable transaction ID if the current flush location has
174 : * reached or surpassed the last received WAL position.
175 : *
176 : * - RDT_STOP_CONFLICT_INFO_RETENTION:
177 : * This phase is required only when max_retention_duration is defined. We
178 : * enter this phase if the wait time in either the
179 : * RDT_WAIT_FOR_PUBLISHER_STATUS or RDT_WAIT_FOR_LOCAL_FLUSH phase exceeds
180 : * configured max_retention_duration. In this phase,
181 : * pg_subscription.subretentionactive is updated to false within a new
182 : * transaction, and oldest_nonremovable_xid is set to InvalidTransactionId.
183 : *
184 : * - RDT_RESUME_CONFLICT_INFO_RETENTION:
185 : * This phase is required only when max_retention_duration is defined. We
186 : * enter this phase if the retention was previously stopped, and the time
187 : * required to advance the non-removable transaction ID in the
188 : * RDT_WAIT_FOR_LOCAL_FLUSH phase has decreased to within acceptable limits
189 : * (or if max_retention_duration is set to 0). During this phase,
190 : * pg_subscription.subretentionactive is updated to true within a new
191 : * transaction, and the worker will be restarted.
192 : *
193 : * The overall state progression is: GET_CANDIDATE_XID ->
194 : * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
195 : * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
196 : * WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID.
197 : *
198 : * Retaining the dead tuples for this period is sufficient for ensuring
199 : * eventual consistency using last-update-wins strategy, as dead tuples are
200 : * useful for detecting conflicts only during the application of concurrent
201 : * transactions from remote nodes. After applying and flushing all remote
202 : * transactions that occurred concurrently with the tuple DELETE, any
203 : * subsequent UPDATE from a remote node should have a later timestamp. In such
204 : * cases, it is acceptable to detect an update_missing scenario and convert the
205 : * UPDATE to an INSERT when applying it. But, for concurrent remote
206 : * transactions with earlier timestamps than the DELETE, detecting
207 : * update_deleted is necessary, as the UPDATEs in remote transactions should be
208 : * ignored if their timestamp is earlier than that of the dead tuples.
209 : *
210 : * Note that advancing the non-removable transaction ID is not supported if the
211 : * publisher is also a physical standby. This is because the logical walsender
212 : * on the standby can only get the WAL replay position but there may be more
213 : * WALs that are being replicated from the primary and those WALs could have
214 : * earlier commit timestamp.
215 : *
216 : * Similarly, when the publisher has subscribed to another publisher,
217 : * information necessary for conflict detection cannot be retained for
218 : * changes from origins other than the publisher. This is because publisher
219 : * lacks the information on concurrent transactions of other publishers to
220 : * which it subscribes. As the information on concurrent transactions is
221 : * unavailable beyond subscriber's immediate publishers, the non-removable
222 : * transaction ID might be advanced prematurely before changes from other
223 : * origins have been fully applied.
224 : *
225 : * XXX Retaining information for changes from other origins might be possible
226 : * by requesting the subscription on that origin to enable retain_dead_tuples
227 : * and fetching the conflict detection slot.xmin along with the publisher's
228 : * status. In the RDT_WAIT_FOR_PUBLISHER_STATUS phase, the apply worker could
229 : * wait for the remote slot's xmin to reach the oldest active transaction ID,
230 : * ensuring that all transactions from other origins have been applied on the
231 : * publisher, thereby getting the latest WAL position that includes all
232 : * concurrent changes. However, this approach may impact performance, so it
233 : * might not worth the effort.
234 : *
235 : * XXX It seems feasible to get the latest commit's WAL location from the
236 : * publisher and wait till that is applied. However, we can't do that
237 : * because commit timestamps can regress as a commit with a later LSN is not
238 : * guaranteed to have a later timestamp than those with earlier LSNs. Having
239 : * said that, even if that is possible, it won't improve performance much as
240 : * the apply always lag and moves slowly as compared with the transactions
241 : * on the publisher.
242 : *-------------------------------------------------------------------------
243 : */
244 :
245 : #include "postgres.h"
246 :
247 : #include <sys/stat.h>
248 : #include <unistd.h>
249 :
250 : #include "access/commit_ts.h"
251 : #include "access/table.h"
252 : #include "access/tableam.h"
253 : #include "access/twophase.h"
254 : #include "access/xact.h"
255 : #include "catalog/indexing.h"
256 : #include "catalog/pg_inherits.h"
257 : #include "catalog/pg_subscription.h"
258 : #include "catalog/pg_subscription_rel.h"
259 : #include "commands/subscriptioncmds.h"
260 : #include "commands/tablecmds.h"
261 : #include "commands/trigger.h"
262 : #include "executor/executor.h"
263 : #include "executor/execPartition.h"
264 : #include "libpq/pqformat.h"
265 : #include "miscadmin.h"
266 : #include "optimizer/optimizer.h"
267 : #include "parser/parse_relation.h"
268 : #include "pgstat.h"
269 : #include "postmaster/bgworker.h"
270 : #include "postmaster/interrupt.h"
271 : #include "postmaster/walwriter.h"
272 : #include "replication/conflict.h"
273 : #include "replication/logicallauncher.h"
274 : #include "replication/logicalproto.h"
275 : #include "replication/logicalrelation.h"
276 : #include "replication/logicalworker.h"
277 : #include "replication/origin.h"
278 : #include "replication/slot.h"
279 : #include "replication/walreceiver.h"
280 : #include "replication/worker_internal.h"
281 : #include "rewrite/rewriteHandler.h"
282 : #include "storage/buffile.h"
283 : #include "storage/ipc.h"
284 : #include "storage/latch.h"
285 : #include "storage/lmgr.h"
286 : #include "storage/procarray.h"
287 : #include "tcop/tcopprot.h"
288 : #include "utils/acl.h"
289 : #include "utils/guc.h"
290 : #include "utils/inval.h"
291 : #include "utils/lsyscache.h"
292 : #include "utils/memutils.h"
293 : #include "utils/pg_lsn.h"
294 : #include "utils/rel.h"
295 : #include "utils/rls.h"
296 : #include "utils/snapmgr.h"
297 : #include "utils/syscache.h"
298 : #include "utils/usercontext.h"
299 : #include "utils/wait_event.h"
300 :
301 : #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
302 :
303 : typedef struct FlushPosition
304 : {
305 : dlist_node node;
306 : XLogRecPtr local_end;
307 : XLogRecPtr remote_end;
308 : } FlushPosition;
309 :
310 : static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
311 :
312 : typedef struct ApplyExecutionData
313 : {
314 : EState *estate; /* executor state, used to track resources */
315 :
316 : LogicalRepRelMapEntry *targetRel; /* replication target rel */
317 : ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
318 :
319 : /* These fields are used when the target relation is partitioned: */
320 : ModifyTableState *mtstate; /* dummy ModifyTable state */
321 : PartitionTupleRouting *proute; /* partition routing info */
322 : } ApplyExecutionData;
323 :
324 : /* Struct for saving and restoring apply errcontext information */
325 : typedef struct ApplyErrorCallbackArg
326 : {
327 : LogicalRepMsgType command; /* 0 if invalid */
328 : LogicalRepRelMapEntry *rel;
329 :
330 : /* Remote node information */
331 : int remote_attnum; /* -1 if invalid */
332 : TransactionId remote_xid;
333 : XLogRecPtr finish_lsn;
334 : char *origin_name;
335 : } ApplyErrorCallbackArg;
336 :
337 : /*
338 : * The action to be taken for the changes in the transaction.
339 : *
340 : * TRANS_LEADER_APPLY:
341 : * This action means that we are in the leader apply worker or table sync
342 : * worker. The changes of the transaction are either directly applied or
343 : * are read from temporary files (for streaming transactions) and then
344 : * applied by the worker.
345 : *
346 : * TRANS_LEADER_SERIALIZE:
347 : * This action means that we are in the leader apply worker or table sync
348 : * worker. Changes are written to temporary files and then applied when the
349 : * final commit arrives.
350 : *
351 : * TRANS_LEADER_SEND_TO_PARALLEL:
352 : * This action means that we are in the leader apply worker and need to send
353 : * the changes to the parallel apply worker.
354 : *
355 : * TRANS_LEADER_PARTIAL_SERIALIZE:
356 : * This action means that we are in the leader apply worker and have sent some
357 : * changes directly to the parallel apply worker and the remaining changes are
358 : * serialized to a file, due to timeout while sending data. The parallel apply
359 : * worker will apply these serialized changes when the final commit arrives.
360 : *
361 : * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
362 : * serializing changes, the leader worker also needs to serialize the
363 : * STREAM_XXX message to a file, and wait for the parallel apply worker to
364 : * finish the transaction when processing the transaction finish command. So
365 : * this new action was introduced to keep the code and logic clear.
366 : *
367 : * TRANS_PARALLEL_APPLY:
368 : * This action means that we are in the parallel apply worker and changes of
369 : * the transaction are applied directly by the worker.
370 : */
371 : typedef enum
372 : {
373 : /* The action for non-streaming transactions. */
374 : TRANS_LEADER_APPLY,
375 :
376 : /* Actions for streaming transactions. */
377 : TRANS_LEADER_SERIALIZE,
378 : TRANS_LEADER_SEND_TO_PARALLEL,
379 : TRANS_LEADER_PARTIAL_SERIALIZE,
380 : TRANS_PARALLEL_APPLY,
381 : } TransApplyAction;
382 :
383 : /*
384 : * The phases involved in advancing the non-removable transaction ID.
385 : *
386 : * See comments atop worker.c for details of the transition between these
387 : * phases.
388 : */
389 : typedef enum
390 : {
391 : RDT_GET_CANDIDATE_XID,
392 : RDT_REQUEST_PUBLISHER_STATUS,
393 : RDT_WAIT_FOR_PUBLISHER_STATUS,
394 : RDT_WAIT_FOR_LOCAL_FLUSH,
395 : RDT_STOP_CONFLICT_INFO_RETENTION,
396 : RDT_RESUME_CONFLICT_INFO_RETENTION,
397 : } RetainDeadTuplesPhase;
398 :
399 : /*
400 : * Critical information for managing phase transitions within the
401 : * RetainDeadTuplesPhase.
402 : */
403 : typedef struct RetainDeadTuplesData
404 : {
405 : RetainDeadTuplesPhase phase; /* current phase */
406 : XLogRecPtr remote_lsn; /* WAL write position on the publisher */
407 :
408 : /*
409 : * Oldest transaction ID that was in the commit phase on the publisher.
410 : * Use FullTransactionId to prevent issues with transaction ID wraparound,
411 : * where a new remote_oldestxid could falsely appear to originate from the
412 : * past and block advancement.
413 : */
414 : FullTransactionId remote_oldestxid;
415 :
416 : /*
417 : * Next transaction ID to be assigned on the publisher. Use
418 : * FullTransactionId for consistency and to allow straightforward
419 : * comparisons with remote_oldestxid.
420 : */
421 : FullTransactionId remote_nextxid;
422 :
423 : TimestampTz reply_time; /* when the publisher responds with status */
424 :
425 : /*
426 : * Publisher transaction ID that must be awaited to complete before
427 : * entering the final phase (RDT_WAIT_FOR_LOCAL_FLUSH). Use
428 : * FullTransactionId for the same reason as remote_nextxid.
429 : */
430 : FullTransactionId remote_wait_for;
431 :
432 : TransactionId candidate_xid; /* candidate for the non-removable
433 : * transaction ID */
434 : TimestampTz flushpos_update_time; /* when the remote flush position was
435 : * updated in final phase
436 : * (RDT_WAIT_FOR_LOCAL_FLUSH) */
437 :
438 : long table_sync_wait_time; /* time spent waiting for table sync
439 : * to finish */
440 :
441 : /*
442 : * The following fields are used to determine the timing for the next
443 : * round of transaction ID advancement.
444 : */
445 : TimestampTz last_recv_time; /* when the last message was received */
446 : TimestampTz candidate_xid_time; /* when the candidate_xid is decided */
447 : int xid_advance_interval; /* how much time (ms) to wait before
448 : * attempting to advance the
449 : * non-removable transaction ID */
450 : } RetainDeadTuplesData;
451 :
452 : /*
453 : * The minimum (100ms) and maximum (3 minutes) intervals for advancing
454 : * non-removable transaction IDs. The maximum interval is a bit arbitrary but
455 : * is sufficient to not cause any undue network traffic.
456 : */
457 : #define MIN_XID_ADVANCE_INTERVAL 100
458 : #define MAX_XID_ADVANCE_INTERVAL 180000
459 :
460 : /* errcontext tracker */
461 : static ApplyErrorCallbackArg apply_error_callback_arg =
462 : {
463 : .command = 0,
464 : .rel = NULL,
465 : .remote_attnum = -1,
466 : .remote_xid = InvalidTransactionId,
467 : .finish_lsn = InvalidXLogRecPtr,
468 : .origin_name = NULL,
469 : };
470 :
471 : ErrorContextCallback *apply_error_context_stack = NULL;
472 :
473 : MemoryContext ApplyMessageContext = NULL;
474 : MemoryContext ApplyContext = NULL;
475 :
476 : /* per stream context for streaming transactions */
477 : static MemoryContext LogicalStreamingContext = NULL;
478 :
479 : WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
480 :
481 : Subscription *MySubscription = NULL;
482 : static bool MySubscriptionValid = false;
483 :
484 : static List *on_commit_wakeup_workers_subids = NIL;
485 :
486 : bool in_remote_transaction = false;
487 : static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
488 :
489 : /* fields valid only when processing streamed transaction */
490 : static bool in_streamed_transaction = false;
491 :
492 : static TransactionId stream_xid = InvalidTransactionId;
493 :
494 : /*
495 : * The number of changes applied by parallel apply worker during one streaming
496 : * block.
497 : */
498 : static uint32 parallel_stream_nchanges = 0;
499 :
500 : /* Are we initializing an apply worker? */
501 : bool InitializingApplyWorker = false;
502 :
503 : /*
504 : * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
505 : * the subscription if the remote transaction's finish LSN matches the subskiplsn.
506 : * Once we start skipping changes, we don't stop it until we skip all changes of
507 : * the transaction even if pg_subscription is updated and MySubscription->skiplsn
508 : * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
509 : * we don't skip receiving and spooling the changes since we decide whether or not
510 : * to skip applying the changes when starting to apply changes. The subskiplsn is
511 : * cleared after successfully skipping the transaction or applying non-empty
512 : * transaction. The latter prevents the mistakenly specified subskiplsn from
513 : * being left. Note that we cannot skip the streaming transactions when using
514 : * parallel apply workers because we cannot get the finish LSN before applying
515 : * the changes. So, we don't start parallel apply worker when finish LSN is set
516 : * by the user.
517 : */
518 : static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
519 : #define is_skipping_changes() (unlikely(XLogRecPtrIsValid(skip_xact_finish_lsn)))
520 :
521 : /* BufFile handle of the current streaming file */
522 : static BufFile *stream_fd = NULL;
523 :
524 : /*
525 : * The remote WAL position that has been applied and flushed locally. We record
526 : * and use this information both while sending feedback to the server and
527 : * advancing oldest_nonremovable_xid.
528 : */
529 : static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
530 :
531 : typedef struct SubXactInfo
532 : {
533 : TransactionId xid; /* XID of the subxact */
534 : int fileno; /* file number in the buffile */
535 : pgoff_t offset; /* offset in the file */
536 : } SubXactInfo;
537 :
538 : /* Sub-transaction data for the current streaming transaction */
539 : typedef struct ApplySubXactData
540 : {
541 : uint32 nsubxacts; /* number of sub-transactions */
542 : uint32 nsubxacts_max; /* current capacity of subxacts */
543 : TransactionId subxact_last; /* xid of the last sub-transaction */
544 : SubXactInfo *subxacts; /* sub-xact offset in changes file */
545 : } ApplySubXactData;
546 :
547 : static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
548 :
549 : static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
550 : static inline void changes_filename(char *path, Oid subid, TransactionId xid);
551 :
552 : /*
553 : * Information about subtransactions of a given toplevel transaction.
554 : */
555 : static void subxact_info_write(Oid subid, TransactionId xid);
556 : static void subxact_info_read(Oid subid, TransactionId xid);
557 : static void subxact_info_add(TransactionId xid);
558 : static inline void cleanup_subxact_info(void);
559 :
560 : /*
561 : * Serialize and deserialize changes for a toplevel transaction.
562 : */
563 : static void stream_open_file(Oid subid, TransactionId xid,
564 : bool first_segment);
565 : static void stream_write_change(char action, StringInfo s);
566 : static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
567 : static void stream_close_file(void);
568 :
569 : static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
570 :
571 : static void maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
572 : bool status_received);
573 : static bool can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data);
574 : static void process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
575 : bool status_received);
576 : static void get_candidate_xid(RetainDeadTuplesData *rdt_data);
577 : static void request_publisher_status(RetainDeadTuplesData *rdt_data);
578 : static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
579 : bool status_received);
580 : static void wait_for_local_flush(RetainDeadTuplesData *rdt_data);
581 : static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
582 : static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
583 : static void resume_conflict_info_retention(RetainDeadTuplesData *rdt_data);
584 : static bool update_retention_status(bool active);
585 : static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data);
586 : static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data,
587 : bool new_xid_found);
588 :
589 : static void apply_worker_exit(void);
590 :
591 : static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
592 : static void apply_handle_insert_internal(ApplyExecutionData *edata,
593 : ResultRelInfo *relinfo,
594 : TupleTableSlot *remoteslot);
595 : static void apply_handle_update_internal(ApplyExecutionData *edata,
596 : ResultRelInfo *relinfo,
597 : TupleTableSlot *remoteslot,
598 : LogicalRepTupleData *newtup,
599 : Oid localindexoid);
600 : static void apply_handle_delete_internal(ApplyExecutionData *edata,
601 : ResultRelInfo *relinfo,
602 : TupleTableSlot *remoteslot,
603 : Oid localindexoid);
604 : static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
605 : LogicalRepRelation *remoterel,
606 : Oid localidxoid,
607 : TupleTableSlot *remoteslot,
608 : TupleTableSlot **localslot);
609 : static bool FindDeletedTupleInLocalRel(Relation localrel,
610 : Oid localidxoid,
611 : TupleTableSlot *remoteslot,
612 : TransactionId *delete_xid,
613 : ReplOriginId *delete_origin,
614 : TimestampTz *delete_time);
615 : static void apply_handle_tuple_routing(ApplyExecutionData *edata,
616 : TupleTableSlot *remoteslot,
617 : LogicalRepTupleData *newtup,
618 : CmdType operation);
619 :
620 : /* Functions for skipping changes */
621 : static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
622 : static void stop_skipping_changes(void);
623 : static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
624 :
625 : /* Functions for apply error callback */
626 : static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
627 : static inline void reset_apply_error_context_info(void);
628 :
629 : static TransApplyAction get_transaction_apply_action(TransactionId xid,
630 : ParallelApplyWorkerInfo **winfo);
631 :
632 : static void set_wal_receiver_timeout(void);
633 :
634 : static void on_exit_clear_xact_state(int code, Datum arg);
635 :
636 : /*
637 : * Form the origin name for the subscription.
638 : *
639 : * This is a common function for tablesync and other workers. Tablesync workers
640 : * must pass a valid relid. Other callers must pass relid = InvalidOid.
641 : *
642 : * Return the name in the supplied buffer.
643 : */
644 : void
645 1397 : ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
646 : char *originname, Size szoriginname)
647 : {
648 1397 : if (OidIsValid(relid))
649 : {
650 : /* Replication origin name for tablesync workers. */
651 790 : snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
652 : }
653 : else
654 : {
655 : /* Replication origin name for non-tablesync workers. */
656 607 : snprintf(originname, szoriginname, "pg_%u", suboid);
657 : }
658 1397 : }
659 :
660 : /*
661 : * Should this worker apply changes for given relation.
662 : *
663 : * This is mainly needed for initial relation data sync as that runs in
664 : * separate worker process running in parallel and we need some way to skip
665 : * changes coming to the leader apply worker during the sync of a table.
666 : *
667 : * Note we need to do smaller or equals comparison for SYNCDONE state because
668 : * it might hold position of end of initial slot consistent point WAL
669 : * record + 1 (ie start of next record) and next record can be COMMIT of
670 : * transaction we are now processing (which is what we set remote_final_lsn
671 : * to in apply_handle_begin).
672 : *
673 : * Note that for streaming transactions that are being applied in the parallel
674 : * apply worker, we disallow applying changes if the target table in the
675 : * subscription is not in the READY state, because we cannot decide whether to
676 : * apply the change as we won't know remote_final_lsn by that time.
677 : *
678 : * We already checked this in pa_can_start() before assigning the
679 : * streaming transaction to the parallel worker, but it also needs to be
680 : * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
681 : * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
682 : * while applying this transaction.
683 : */
684 : static bool
685 148526 : should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
686 : {
687 148526 : switch (MyLogicalRepWorker->type)
688 : {
689 0 : case WORKERTYPE_TABLESYNC:
690 0 : return MyLogicalRepWorker->relid == rel->localreloid;
691 :
692 68686 : case WORKERTYPE_PARALLEL_APPLY:
693 : /* We don't synchronize rel's that are in unknown state. */
694 68686 : if (rel->state != SUBREL_STATE_READY &&
695 0 : rel->state != SUBREL_STATE_UNKNOWN)
696 0 : ereport(ERROR,
697 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
698 : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
699 : MySubscription->name),
700 : errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
701 :
702 68686 : return rel->state == SUBREL_STATE_READY;
703 :
704 79840 : case WORKERTYPE_APPLY:
705 79902 : return (rel->state == SUBREL_STATE_READY ||
706 62 : (rel->state == SUBREL_STATE_SYNCDONE &&
707 15 : rel->statelsn <= remote_final_lsn));
708 :
709 0 : case WORKERTYPE_SEQUENCESYNC:
710 : /* Should never happen. */
711 0 : elog(ERROR, "sequence synchronization worker is not expected to apply changes");
712 : break;
713 :
714 0 : case WORKERTYPE_UNKNOWN:
715 : /* Should never happen. */
716 0 : elog(ERROR, "Unknown worker type");
717 : }
718 :
719 0 : return false; /* dummy for compiler */
720 : }
721 :
722 : /*
723 : * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
724 : *
725 : * Start a transaction, if this is the first step (else we keep using the
726 : * existing transaction).
727 : * Also provide a global snapshot and ensure we run in ApplyMessageContext.
728 : */
729 : static void
730 148987 : begin_replication_step(void)
731 : {
732 148987 : SetCurrentStatementStartTimestamp();
733 :
734 148987 : if (!IsTransactionState())
735 : {
736 976 : StartTransactionCommand();
737 976 : maybe_reread_subscription();
738 : }
739 :
740 148984 : PushActiveSnapshot(GetTransactionSnapshot());
741 :
742 148984 : MemoryContextSwitchTo(ApplyMessageContext);
743 148984 : }
744 :
745 : /*
746 : * Finish up one step of a replication transaction.
747 : * Callers of begin_replication_step() must also call this.
748 : *
749 : * We don't close out the transaction here, but we should increment
750 : * the command counter to make the effects of this step visible.
751 : */
752 : static void
753 148931 : end_replication_step(void)
754 : {
755 148931 : PopActiveSnapshot();
756 :
757 148931 : CommandCounterIncrement();
758 148931 : }
759 :
760 : /*
761 : * Handle streamed transactions for both the leader apply worker and the
762 : * parallel apply workers.
763 : *
764 : * In the streaming case (receiving a block of the streamed transaction), for
765 : * serialize mode, simply redirect it to a file for the proper toplevel
766 : * transaction, and for parallel mode, the leader apply worker will send the
767 : * changes to parallel apply workers and the parallel apply worker will define
768 : * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
769 : * messages will be applied by both leader apply worker and parallel apply
770 : * workers).
771 : *
772 : * Returns true for streamed transactions (when the change is either serialized
773 : * to file or sent to parallel apply worker), false otherwise (regular mode or
774 : * needs to be processed by parallel apply worker).
775 : *
776 : * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
777 : * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
778 : * to a parallel apply worker.
779 : */
780 : static bool
781 324869 : handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
782 : {
783 : TransactionId current_xid;
784 : ParallelApplyWorkerInfo *winfo;
785 : TransApplyAction apply_action;
786 : StringInfoData original_msg;
787 :
788 324869 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
789 :
790 : /* not in streaming mode */
791 324869 : if (apply_action == TRANS_LEADER_APPLY)
792 80241 : return false;
793 :
794 : Assert(TransactionIdIsValid(stream_xid));
795 :
796 : /*
797 : * The parallel apply worker needs the xid in this message to decide
798 : * whether to define a savepoint, so save the original message that has
799 : * not moved the cursor after the xid. We will serialize this message to a
800 : * file in PARTIAL_SERIALIZE mode.
801 : */
802 244628 : original_msg = *s;
803 :
804 : /*
805 : * We should have received XID of the subxact as the first part of the
806 : * message, so extract it.
807 : */
808 244628 : current_xid = pq_getmsgint(s, 4);
809 :
810 244628 : if (!TransactionIdIsValid(current_xid))
811 0 : ereport(ERROR,
812 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
813 : errmsg_internal("invalid transaction ID in streamed replication transaction")));
814 :
815 244628 : switch (apply_action)
816 : {
817 102513 : case TRANS_LEADER_SERIALIZE:
818 : Assert(stream_fd);
819 :
820 : /* Add the new subxact to the array (unless already there). */
821 102513 : subxact_info_add(current_xid);
822 :
823 : /* Write the change to the current file */
824 102513 : stream_write_change(action, s);
825 102513 : return true;
826 :
827 68392 : case TRANS_LEADER_SEND_TO_PARALLEL:
828 : Assert(winfo);
829 :
830 : /*
831 : * XXX The publisher side doesn't always send relation/type update
832 : * messages after the streaming transaction, so also update the
833 : * relation/type in leader apply worker. See function
834 : * cleanup_rel_sync_cache.
835 : */
836 68392 : if (pa_send_data(winfo, s->len, s->data))
837 68392 : return (action != LOGICAL_REP_MSG_RELATION &&
838 : action != LOGICAL_REP_MSG_TYPE);
839 :
840 : /*
841 : * Switch to serialize mode when we are not able to send the
842 : * change to parallel apply worker.
843 : */
844 0 : pa_switch_to_partial_serialize(winfo, false);
845 :
846 : pg_fallthrough;
847 5006 : case TRANS_LEADER_PARTIAL_SERIALIZE:
848 5006 : stream_write_change(action, &original_msg);
849 :
850 : /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
851 5006 : return (action != LOGICAL_REP_MSG_RELATION &&
852 : action != LOGICAL_REP_MSG_TYPE);
853 :
854 68717 : case TRANS_PARALLEL_APPLY:
855 68717 : parallel_stream_nchanges += 1;
856 :
857 : /* Define a savepoint for a subxact if needed. */
858 68717 : pa_start_subtrans(current_xid, stream_xid);
859 68717 : return false;
860 :
861 0 : default:
862 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
863 : return false; /* silence compiler warning */
864 : }
865 : }
866 :
867 : /*
868 : * Executor state preparation for evaluation of constraint expressions,
869 : * indexes and triggers for the specified relation.
870 : *
871 : * Note that the caller must open and close any indexes to be updated.
872 : */
873 : static ApplyExecutionData *
874 148449 : create_edata_for_relation(LogicalRepRelMapEntry *rel)
875 : {
876 : ApplyExecutionData *edata;
877 : EState *estate;
878 : RangeTblEntry *rte;
879 148449 : List *perminfos = NIL;
880 : ResultRelInfo *resultRelInfo;
881 :
882 148449 : edata = palloc0_object(ApplyExecutionData);
883 148449 : edata->targetRel = rel;
884 :
885 148449 : edata->estate = estate = CreateExecutorState();
886 :
887 148449 : rte = makeNode(RangeTblEntry);
888 148449 : rte->rtekind = RTE_RELATION;
889 148449 : rte->relid = RelationGetRelid(rel->localrel);
890 148449 : rte->relkind = rel->localrel->rd_rel->relkind;
891 148449 : rte->rellockmode = AccessShareLock;
892 :
893 148449 : addRTEPermissionInfo(&perminfos, rte);
894 :
895 148449 : ExecInitRangeTable(estate, list_make1(rte), perminfos,
896 : bms_make_singleton(1));
897 :
898 148449 : edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
899 :
900 : /*
901 : * Use Relation opened by logicalrep_rel_open() instead of opening it
902 : * again.
903 : */
904 148449 : InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
905 :
906 : /*
907 : * We put the ResultRelInfo in the es_opened_result_relations list, even
908 : * though we don't populate the es_result_relations array. That's a bit
909 : * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
910 : *
911 : * ExecOpenIndices() is not called here either, each execution path doing
912 : * an apply operation being responsible for that.
913 : */
914 148449 : estate->es_opened_result_relations =
915 148449 : lappend(estate->es_opened_result_relations, resultRelInfo);
916 :
917 148449 : estate->es_output_cid = GetCurrentCommandId(true);
918 :
919 : /* Prepare to catch AFTER triggers. */
920 148449 : AfterTriggerBeginQuery();
921 :
922 : /* other fields of edata remain NULL for now */
923 :
924 148449 : return edata;
925 : }
926 :
927 : /*
928 : * Finish any operations related to the executor state created by
929 : * create_edata_for_relation().
930 : */
931 : static void
932 148404 : finish_edata(ApplyExecutionData *edata)
933 : {
934 148404 : EState *estate = edata->estate;
935 :
936 : /* Handle any queued AFTER triggers. */
937 148404 : AfterTriggerEndQuery(estate);
938 :
939 : /* Shut down tuple routing, if any was done. */
940 148404 : if (edata->proute)
941 74 : ExecCleanupTupleRouting(edata->mtstate, edata->proute);
942 :
943 : /*
944 : * Cleanup. It might seem that we should call ExecCloseResultRelations()
945 : * here, but we intentionally don't. It would close the rel we added to
946 : * es_opened_result_relations above, which is wrong because we took no
947 : * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
948 : * any other relations opened during execution.
949 : */
950 148404 : ExecResetTupleTable(estate->es_tupleTable, false);
951 148404 : FreeExecutorState(estate);
952 148404 : pfree(edata);
953 148404 : }
954 :
955 : /*
956 : * Executes default values for columns for which we can't map to remote
957 : * relation columns.
958 : *
959 : * This allows us to support tables which have more columns on the downstream
960 : * than on the upstream.
961 : */
962 : static void
963 76188 : slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
964 : TupleTableSlot *slot)
965 : {
966 76188 : TupleDesc desc = RelationGetDescr(rel->localrel);
967 76188 : int num_phys_attrs = desc->natts;
968 : int i;
969 : int attnum,
970 76188 : num_defaults = 0;
971 : int *defmap;
972 : ExprState **defexprs;
973 : ExprContext *econtext;
974 :
975 76188 : econtext = GetPerTupleExprContext(estate);
976 :
977 : /* We got all the data via replication, no need to evaluate anything. */
978 76188 : if (num_phys_attrs == rel->remoterel.natts)
979 36043 : return;
980 :
981 40145 : defmap = palloc_array(int, num_phys_attrs);
982 40145 : defexprs = palloc_array(ExprState *, num_phys_attrs);
983 :
984 : Assert(rel->attrmap->maplen == num_phys_attrs);
985 210663 : for (attnum = 0; attnum < num_phys_attrs; attnum++)
986 : {
987 170518 : CompactAttribute *cattr = TupleDescCompactAttr(desc, attnum);
988 : Expr *defexpr;
989 :
990 170518 : if (cattr->attisdropped || cattr->attgenerated)
991 9 : continue;
992 :
993 170509 : if (rel->attrmap->attnums[attnum] >= 0)
994 92268 : continue;
995 :
996 78241 : defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
997 :
998 78241 : if (defexpr != NULL)
999 : {
1000 : /* Run the expression through planner */
1001 70131 : defexpr = expression_planner(defexpr);
1002 :
1003 : /* Initialize executable expression in copycontext */
1004 70131 : defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
1005 70131 : defmap[num_defaults] = attnum;
1006 70131 : num_defaults++;
1007 : }
1008 : }
1009 :
1010 110276 : for (i = 0; i < num_defaults; i++)
1011 70131 : slot->tts_values[defmap[i]] =
1012 70131 : ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
1013 : }
1014 :
1015 : /*
1016 : * Store tuple data into slot.
1017 : *
1018 : * Incoming data can be either text or binary format.
1019 : */
1020 : static void
1021 148463 : slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
1022 : LogicalRepTupleData *tupleData)
1023 : {
1024 148463 : int natts = slot->tts_tupleDescriptor->natts;
1025 : int i;
1026 :
1027 148463 : ExecClearTuple(slot);
1028 :
1029 : /* Call the "in" function for each non-dropped, non-null attribute */
1030 : Assert(natts == rel->attrmap->maplen);
1031 658520 : for (i = 0; i < natts; i++)
1032 : {
1033 510057 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
1034 510057 : int remoteattnum = rel->attrmap->attnums[i];
1035 :
1036 510057 : if (!att->attisdropped && remoteattnum >= 0)
1037 303140 : {
1038 303140 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
1039 :
1040 : Assert(remoteattnum < tupleData->ncols);
1041 :
1042 : /* Set attnum for error callback */
1043 303140 : apply_error_callback_arg.remote_attnum = remoteattnum;
1044 :
1045 303140 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
1046 : {
1047 : Oid typinput;
1048 : Oid typioparam;
1049 :
1050 142512 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1051 285024 : slot->tts_values[i] =
1052 142512 : OidInputFunctionCall(typinput, colvalue->data,
1053 : typioparam, att->atttypmod);
1054 142512 : slot->tts_isnull[i] = false;
1055 : }
1056 160628 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1057 : {
1058 : Oid typreceive;
1059 : Oid typioparam;
1060 :
1061 : /*
1062 : * In some code paths we may be asked to re-parse the same
1063 : * tuple data. Reset the StringInfo's cursor so that works.
1064 : */
1065 110298 : colvalue->cursor = 0;
1066 :
1067 110298 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1068 220596 : slot->tts_values[i] =
1069 110298 : OidReceiveFunctionCall(typreceive, colvalue,
1070 : typioparam, att->atttypmod);
1071 :
1072 : /* Trouble if it didn't eat the whole buffer */
1073 110298 : if (colvalue->cursor != colvalue->len)
1074 0 : ereport(ERROR,
1075 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
1076 : errmsg("incorrect binary data format in logical replication column %d",
1077 : remoteattnum + 1)));
1078 110298 : slot->tts_isnull[i] = false;
1079 : }
1080 : else
1081 : {
1082 : /*
1083 : * NULL value from remote. (We don't expect to see
1084 : * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
1085 : * NULL.)
1086 : */
1087 50330 : slot->tts_values[i] = (Datum) 0;
1088 50330 : slot->tts_isnull[i] = true;
1089 : }
1090 :
1091 : /* Reset attnum for error callback */
1092 303140 : apply_error_callback_arg.remote_attnum = -1;
1093 : }
1094 : else
1095 : {
1096 : /*
1097 : * We assign NULL to dropped attributes and missing values
1098 : * (missing values should be later filled using
1099 : * slot_fill_defaults).
1100 : */
1101 206917 : slot->tts_values[i] = (Datum) 0;
1102 206917 : slot->tts_isnull[i] = true;
1103 : }
1104 : }
1105 :
1106 148463 : ExecStoreVirtualTuple(slot);
1107 148463 : }
1108 :
1109 : /*
1110 : * Replace updated columns with data from the LogicalRepTupleData struct.
1111 : * This is somewhat similar to heap_modify_tuple but also calls the type
1112 : * input functions on the user data.
1113 : *
1114 : * "slot" is filled with a copy of the tuple in "srcslot", replacing
1115 : * columns provided in "tupleData" and leaving others as-is.
1116 : *
1117 : * Caution: unreplaced pass-by-ref columns in "slot" will point into the
1118 : * storage for "srcslot". This is OK for current usage, but someday we may
1119 : * need to materialize "slot" at the end to make it independent of "srcslot".
1120 : */
1121 : static void
1122 31924 : slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
1123 : LogicalRepRelMapEntry *rel,
1124 : LogicalRepTupleData *tupleData)
1125 : {
1126 31924 : int natts = slot->tts_tupleDescriptor->natts;
1127 : int i;
1128 :
1129 : /* We'll fill "slot" with a virtual tuple, so we must start with ... */
1130 31924 : ExecClearTuple(slot);
1131 :
1132 : /*
1133 : * Copy all the column data from srcslot, so that we'll have valid values
1134 : * for unreplaced columns.
1135 : */
1136 : Assert(natts == srcslot->tts_tupleDescriptor->natts);
1137 31924 : slot_getallattrs(srcslot);
1138 31924 : memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
1139 31924 : memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
1140 :
1141 : /* Call the "in" function for each replaced attribute */
1142 : Assert(natts == rel->attrmap->maplen);
1143 159280 : for (i = 0; i < natts; i++)
1144 : {
1145 127356 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
1146 127356 : int remoteattnum = rel->attrmap->attnums[i];
1147 :
1148 127356 : if (remoteattnum < 0)
1149 58519 : continue;
1150 :
1151 : Assert(remoteattnum < tupleData->ncols);
1152 :
1153 68837 : if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
1154 : {
1155 68837 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
1156 :
1157 : /* Set attnum for error callback */
1158 68837 : apply_error_callback_arg.remote_attnum = remoteattnum;
1159 :
1160 68837 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
1161 : {
1162 : Oid typinput;
1163 : Oid typioparam;
1164 :
1165 25433 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1166 50866 : slot->tts_values[i] =
1167 25433 : OidInputFunctionCall(typinput, colvalue->data,
1168 : typioparam, att->atttypmod);
1169 25433 : slot->tts_isnull[i] = false;
1170 : }
1171 43404 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1172 : {
1173 : Oid typreceive;
1174 : Oid typioparam;
1175 :
1176 : /*
1177 : * In some code paths we may be asked to re-parse the same
1178 : * tuple data. Reset the StringInfo's cursor so that works.
1179 : */
1180 43356 : colvalue->cursor = 0;
1181 :
1182 43356 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1183 86712 : slot->tts_values[i] =
1184 43356 : OidReceiveFunctionCall(typreceive, colvalue,
1185 : typioparam, att->atttypmod);
1186 :
1187 : /* Trouble if it didn't eat the whole buffer */
1188 43356 : if (colvalue->cursor != colvalue->len)
1189 0 : ereport(ERROR,
1190 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
1191 : errmsg("incorrect binary data format in logical replication column %d",
1192 : remoteattnum + 1)));
1193 43356 : slot->tts_isnull[i] = false;
1194 : }
1195 : else
1196 : {
1197 : /* must be LOGICALREP_COLUMN_NULL */
1198 48 : slot->tts_values[i] = (Datum) 0;
1199 48 : slot->tts_isnull[i] = true;
1200 : }
1201 :
1202 : /* Reset attnum for error callback */
1203 68837 : apply_error_callback_arg.remote_attnum = -1;
1204 : }
1205 : }
1206 :
1207 : /* And finally, declare that "slot" contains a valid virtual tuple */
1208 31924 : ExecStoreVirtualTuple(slot);
1209 31924 : }
1210 :
1211 : /*
1212 : * Handle BEGIN message.
1213 : */
1214 : static void
1215 496 : apply_handle_begin(StringInfo s)
1216 : {
1217 : LogicalRepBeginData begin_data;
1218 :
1219 : /* There must not be an active streaming transaction. */
1220 : Assert(!TransactionIdIsValid(stream_xid));
1221 :
1222 496 : logicalrep_read_begin(s, &begin_data);
1223 496 : set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
1224 :
1225 496 : remote_final_lsn = begin_data.final_lsn;
1226 :
1227 496 : maybe_start_skipping_changes(begin_data.final_lsn);
1228 :
1229 496 : in_remote_transaction = true;
1230 :
1231 496 : pgstat_report_activity(STATE_RUNNING, NULL);
1232 496 : }
1233 :
1234 : /*
1235 : * Handle COMMIT message.
1236 : *
1237 : * TODO, support tracking of multiple origins
1238 : */
1239 : static void
1240 442 : apply_handle_commit(StringInfo s)
1241 : {
1242 : LogicalRepCommitData commit_data;
1243 :
1244 442 : logicalrep_read_commit(s, &commit_data);
1245 :
1246 442 : if (commit_data.commit_lsn != remote_final_lsn)
1247 0 : ereport(ERROR,
1248 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1249 : errmsg_internal("incorrect commit LSN %X/%08X in commit message (expected %X/%08X)",
1250 : LSN_FORMAT_ARGS(commit_data.commit_lsn),
1251 : LSN_FORMAT_ARGS(remote_final_lsn))));
1252 :
1253 442 : apply_handle_commit_internal(&commit_data);
1254 :
1255 : /*
1256 : * Process any tables that are being synchronized in parallel, as well as
1257 : * any newly added tables or sequences.
1258 : */
1259 442 : ProcessSyncingRelations(commit_data.end_lsn);
1260 :
1261 442 : pgstat_report_activity(STATE_IDLE, NULL);
1262 442 : reset_apply_error_context_info();
1263 442 : }
1264 :
1265 : /*
1266 : * Handle BEGIN PREPARE message.
1267 : */
1268 : static void
1269 17 : apply_handle_begin_prepare(StringInfo s)
1270 : {
1271 : LogicalRepPreparedTxnData begin_data;
1272 :
1273 : /* Tablesync should never receive prepare. */
1274 17 : if (am_tablesync_worker())
1275 0 : ereport(ERROR,
1276 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1277 : errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1278 :
1279 : /* There must not be an active streaming transaction. */
1280 : Assert(!TransactionIdIsValid(stream_xid));
1281 :
1282 17 : logicalrep_read_begin_prepare(s, &begin_data);
1283 17 : set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
1284 :
1285 17 : remote_final_lsn = begin_data.prepare_lsn;
1286 :
1287 17 : maybe_start_skipping_changes(begin_data.prepare_lsn);
1288 :
1289 17 : in_remote_transaction = true;
1290 :
1291 17 : pgstat_report_activity(STATE_RUNNING, NULL);
1292 17 : }
1293 :
1294 : /*
1295 : * Common function to prepare the GID.
1296 : */
1297 : static void
1298 27 : apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
1299 : {
1300 : char gid[GIDSIZE];
1301 :
1302 : /*
1303 : * Compute unique GID for two_phase transactions. We don't use GID of
1304 : * prepared transaction sent by server as that can lead to deadlock when
1305 : * we have multiple subscriptions from same node point to publications on
1306 : * the same node. See comments atop worker.c
1307 : */
1308 27 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
1309 : gid, sizeof(gid));
1310 :
1311 : /*
1312 : * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1313 : * called within the PrepareTransactionBlock below.
1314 : */
1315 27 : if (!IsTransactionBlock())
1316 : {
1317 27 : BeginTransactionBlock();
1318 27 : CommitTransactionCommand(); /* Completes the preceding Begin command. */
1319 : }
1320 :
1321 : /*
1322 : * Update origin state so we can restart streaming from correct position
1323 : * in case of crash.
1324 : */
1325 27 : replorigin_xact_state.origin_lsn = prepare_data->end_lsn;
1326 27 : replorigin_xact_state.origin_timestamp = prepare_data->prepare_time;
1327 :
1328 27 : PrepareTransactionBlock(gid);
1329 27 : }
1330 :
1331 : /*
1332 : * Handle PREPARE message.
1333 : */
1334 : static void
1335 16 : apply_handle_prepare(StringInfo s)
1336 : {
1337 : LogicalRepPreparedTxnData prepare_data;
1338 :
1339 16 : logicalrep_read_prepare(s, &prepare_data);
1340 :
1341 16 : if (prepare_data.prepare_lsn != remote_final_lsn)
1342 0 : ereport(ERROR,
1343 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1344 : errmsg_internal("incorrect prepare LSN %X/%08X in prepare message (expected %X/%08X)",
1345 : LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1346 : LSN_FORMAT_ARGS(remote_final_lsn))));
1347 :
1348 : /*
1349 : * Unlike commit, here, we always prepare the transaction even though no
1350 : * change has happened in this transaction or all changes are skipped. It
1351 : * is done this way because at commit prepared time, we won't know whether
1352 : * we have skipped preparing a transaction because of those reasons.
1353 : *
1354 : * XXX, We can optimize such that at commit prepared time, we first check
1355 : * whether we have prepared the transaction or not but that doesn't seem
1356 : * worthwhile because such cases shouldn't be common.
1357 : */
1358 16 : begin_replication_step();
1359 :
1360 16 : apply_handle_prepare_internal(&prepare_data);
1361 :
1362 16 : end_replication_step();
1363 16 : CommitTransactionCommand();
1364 15 : pgstat_report_stat(false);
1365 :
1366 : /*
1367 : * It is okay not to set the local_end LSN for the prepare because we
1368 : * always flush the prepare record. So, we can send the acknowledgment of
1369 : * the remote_end LSN as soon as prepare is finished.
1370 : *
1371 : * XXX For the sake of consistency with commit, we could have set it with
1372 : * the LSN of prepare but as of now we don't track that value similar to
1373 : * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
1374 : * it.
1375 : */
1376 15 : store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
1377 :
1378 15 : in_remote_transaction = false;
1379 :
1380 : /*
1381 : * Process any tables that are being synchronized in parallel, as well as
1382 : * any newly added tables or sequences.
1383 : */
1384 15 : ProcessSyncingRelations(prepare_data.end_lsn);
1385 :
1386 : /*
1387 : * Since we have already prepared the transaction, in a case where the
1388 : * server crashes before clearing the subskiplsn, it will be left but the
1389 : * transaction won't be resent. But that's okay because it's a rare case
1390 : * and the subskiplsn will be cleared when finishing the next transaction.
1391 : */
1392 15 : stop_skipping_changes();
1393 15 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
1394 :
1395 15 : pgstat_report_activity(STATE_IDLE, NULL);
1396 15 : reset_apply_error_context_info();
1397 15 : }
1398 :
1399 : /*
1400 : * Handle a COMMIT PREPARED of a previously PREPARED transaction.
1401 : *
1402 : * Note that we don't need to wait here if the transaction was prepared in a
1403 : * parallel apply worker. In that case, we have already waited for the prepare
1404 : * to finish in apply_handle_stream_prepare() which will ensure all the
1405 : * operations in that transaction have happened in the subscriber, so no
1406 : * concurrent transaction can cause deadlock or transaction dependency issues.
1407 : */
1408 : static void
1409 22 : apply_handle_commit_prepared(StringInfo s)
1410 : {
1411 : LogicalRepCommitPreparedTxnData prepare_data;
1412 : char gid[GIDSIZE];
1413 :
1414 22 : logicalrep_read_commit_prepared(s, &prepare_data);
1415 22 : set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
1416 :
1417 : /* Compute GID for two_phase transactions. */
1418 22 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
1419 : gid, sizeof(gid));
1420 :
1421 : /* There is no transaction when COMMIT PREPARED is called */
1422 22 : begin_replication_step();
1423 :
1424 : /*
1425 : * Update origin state so we can restart streaming from correct position
1426 : * in case of crash.
1427 : */
1428 22 : replorigin_xact_state.origin_lsn = prepare_data.end_lsn;
1429 22 : replorigin_xact_state.origin_timestamp = prepare_data.commit_time;
1430 :
1431 22 : FinishPreparedTransaction(gid, true);
1432 22 : end_replication_step();
1433 22 : CommitTransactionCommand();
1434 22 : pgstat_report_stat(false);
1435 :
1436 22 : store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
1437 22 : in_remote_transaction = false;
1438 :
1439 : /*
1440 : * Process any tables that are being synchronized in parallel, as well as
1441 : * any newly added tables or sequences.
1442 : */
1443 22 : ProcessSyncingRelations(prepare_data.end_lsn);
1444 :
1445 22 : clear_subscription_skip_lsn(prepare_data.end_lsn);
1446 :
1447 22 : pgstat_report_activity(STATE_IDLE, NULL);
1448 22 : reset_apply_error_context_info();
1449 22 : }
1450 :
1451 : /*
1452 : * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
1453 : *
1454 : * Note that we don't need to wait here if the transaction was prepared in a
1455 : * parallel apply worker. In that case, we have already waited for the prepare
1456 : * to finish in apply_handle_stream_prepare() which will ensure all the
1457 : * operations in that transaction have happened in the subscriber, so no
1458 : * concurrent transaction can cause deadlock or transaction dependency issues.
1459 : */
1460 : static void
1461 5 : apply_handle_rollback_prepared(StringInfo s)
1462 : {
1463 : LogicalRepRollbackPreparedTxnData rollback_data;
1464 : char gid[GIDSIZE];
1465 :
1466 5 : logicalrep_read_rollback_prepared(s, &rollback_data);
1467 5 : set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1468 :
1469 : /* Compute GID for two_phase transactions. */
1470 5 : TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
1471 : gid, sizeof(gid));
1472 :
1473 : /*
1474 : * It is possible that we haven't received prepare because it occurred
1475 : * before walsender reached a consistent point or the two_phase was still
1476 : * not enabled by that time, so in such cases, we need to skip rollback
1477 : * prepared.
1478 : */
1479 5 : if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1480 : rollback_data.prepare_time))
1481 : {
1482 : /*
1483 : * Update origin state so we can restart streaming from correct
1484 : * position in case of crash.
1485 : */
1486 5 : replorigin_xact_state.origin_lsn = rollback_data.rollback_end_lsn;
1487 5 : replorigin_xact_state.origin_timestamp = rollback_data.rollback_time;
1488 :
1489 : /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1490 5 : begin_replication_step();
1491 5 : FinishPreparedTransaction(gid, false);
1492 5 : end_replication_step();
1493 5 : CommitTransactionCommand();
1494 :
1495 5 : clear_subscription_skip_lsn(rollback_data.rollback_end_lsn);
1496 : }
1497 :
1498 5 : pgstat_report_stat(false);
1499 :
1500 : /*
1501 : * It is okay not to set the local_end LSN for the rollback of prepared
1502 : * transaction because we always flush the WAL record for it. See
1503 : * apply_handle_prepare.
1504 : */
1505 5 : store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr);
1506 5 : in_remote_transaction = false;
1507 :
1508 : /*
1509 : * Process any tables that are being synchronized in parallel, as well as
1510 : * any newly added tables or sequences.
1511 : */
1512 5 : ProcessSyncingRelations(rollback_data.rollback_end_lsn);
1513 :
1514 5 : pgstat_report_activity(STATE_IDLE, NULL);
1515 5 : reset_apply_error_context_info();
1516 5 : }
1517 :
1518 : /*
1519 : * Handle STREAM PREPARE.
1520 : */
1521 : static void
1522 17 : apply_handle_stream_prepare(StringInfo s)
1523 : {
1524 : LogicalRepPreparedTxnData prepare_data;
1525 : ParallelApplyWorkerInfo *winfo;
1526 : TransApplyAction apply_action;
1527 :
1528 : /* Save the message before it is consumed. */
1529 17 : StringInfoData original_msg = *s;
1530 :
1531 17 : if (in_streamed_transaction)
1532 0 : ereport(ERROR,
1533 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1534 : errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1535 :
1536 : /* Tablesync should never receive prepare. */
1537 17 : if (am_tablesync_worker())
1538 0 : ereport(ERROR,
1539 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1540 : errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1541 :
1542 17 : logicalrep_read_stream_prepare(s, &prepare_data);
1543 17 : set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1544 :
1545 17 : apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
1546 :
1547 17 : switch (apply_action)
1548 : {
1549 5 : case TRANS_LEADER_APPLY:
1550 :
1551 : /*
1552 : * The transaction has been serialized to file, so replay all the
1553 : * spooled operations.
1554 : */
1555 5 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset,
1556 : prepare_data.xid, prepare_data.prepare_lsn);
1557 :
1558 : /* Mark the transaction as prepared. */
1559 5 : apply_handle_prepare_internal(&prepare_data);
1560 :
1561 5 : CommitTransactionCommand();
1562 :
1563 : /*
1564 : * It is okay not to set the local_end LSN for the prepare because
1565 : * we always flush the prepare record. See apply_handle_prepare.
1566 : */
1567 5 : store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
1568 :
1569 5 : in_remote_transaction = false;
1570 :
1571 : /* Unlink the files with serialized changes and subxact info. */
1572 5 : stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
1573 :
1574 5 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1575 5 : break;
1576 :
1577 5 : case TRANS_LEADER_SEND_TO_PARALLEL:
1578 : Assert(winfo);
1579 :
1580 5 : if (pa_send_data(winfo, s->len, s->data))
1581 : {
1582 : /* Finish processing the streaming transaction. */
1583 5 : pa_xact_finish(winfo, prepare_data.end_lsn);
1584 3 : break;
1585 : }
1586 :
1587 : /*
1588 : * Switch to serialize mode when we are not able to send the
1589 : * change to parallel apply worker.
1590 : */
1591 0 : pa_switch_to_partial_serialize(winfo, true);
1592 :
1593 : pg_fallthrough;
1594 1 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1595 : Assert(winfo);
1596 :
1597 1 : stream_open_and_write_change(prepare_data.xid,
1598 : LOGICAL_REP_MSG_STREAM_PREPARE,
1599 : &original_msg);
1600 :
1601 1 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
1602 :
1603 : /* Finish processing the streaming transaction. */
1604 1 : pa_xact_finish(winfo, prepare_data.end_lsn);
1605 1 : break;
1606 :
1607 6 : case TRANS_PARALLEL_APPLY:
1608 :
1609 : /*
1610 : * If the parallel apply worker is applying spooled messages then
1611 : * close the file before preparing.
1612 : */
1613 6 : if (stream_fd)
1614 1 : stream_close_file();
1615 :
1616 6 : begin_replication_step();
1617 :
1618 : /* Mark the transaction as prepared. */
1619 6 : apply_handle_prepare_internal(&prepare_data);
1620 :
1621 6 : end_replication_step();
1622 :
1623 6 : CommitTransactionCommand();
1624 :
1625 : /*
1626 : * It is okay not to set the local_end LSN for the prepare because
1627 : * we always flush the prepare record. See apply_handle_prepare.
1628 : */
1629 4 : MyParallelShared->last_commit_end = InvalidXLogRecPtr;
1630 :
1631 4 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
1632 4 : pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1633 :
1634 4 : pa_reset_subtrans();
1635 :
1636 4 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1637 4 : break;
1638 :
1639 0 : default:
1640 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1641 : break;
1642 : }
1643 :
1644 13 : pgstat_report_stat(false);
1645 :
1646 : /*
1647 : * Process any tables that are being synchronized in parallel, as well as
1648 : * any newly added tables or sequences.
1649 : */
1650 13 : ProcessSyncingRelations(prepare_data.end_lsn);
1651 :
1652 : /*
1653 : * Similar to prepare case, the subskiplsn could be left in a case of
1654 : * server crash but it's okay. See the comments in apply_handle_prepare().
1655 : */
1656 13 : stop_skipping_changes();
1657 13 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
1658 :
1659 13 : pgstat_report_activity(STATE_IDLE, NULL);
1660 :
1661 13 : reset_apply_error_context_info();
1662 13 : }
1663 :
1664 : /*
1665 : * Handle ORIGIN message.
1666 : *
1667 : * TODO, support tracking of multiple origins
1668 : */
1669 : static void
1670 7 : apply_handle_origin(StringInfo s)
1671 : {
1672 : /*
1673 : * ORIGIN message can only come inside streaming transaction or inside
1674 : * remote transaction and before any actual writes.
1675 : */
1676 7 : if (!in_streamed_transaction &&
1677 10 : (!in_remote_transaction ||
1678 5 : (IsTransactionState() && !am_tablesync_worker())))
1679 0 : ereport(ERROR,
1680 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1681 : errmsg_internal("ORIGIN message sent out of order")));
1682 7 : }
1683 :
1684 : /*
1685 : * Initialize fileset (if not already done).
1686 : *
1687 : * Create a new file when first_segment is true, otherwise open the existing
1688 : * file.
1689 : */
1690 : void
1691 363 : stream_start_internal(TransactionId xid, bool first_segment)
1692 : {
1693 363 : begin_replication_step();
1694 :
1695 : /*
1696 : * Initialize the worker's stream_fileset if we haven't yet. This will be
1697 : * used for the entire duration of the worker so create it in a permanent
1698 : * context. We create this on the very first streaming message from any
1699 : * transaction and then use it for this and other streaming transactions.
1700 : * Now, we could create a fileset at the start of the worker as well but
1701 : * then we won't be sure that it will ever be used.
1702 : */
1703 363 : if (!MyLogicalRepWorker->stream_fileset)
1704 : {
1705 : MemoryContext oldctx;
1706 :
1707 14 : oldctx = MemoryContextSwitchTo(ApplyContext);
1708 :
1709 14 : MyLogicalRepWorker->stream_fileset = palloc_object(FileSet);
1710 14 : FileSetInit(MyLogicalRepWorker->stream_fileset);
1711 :
1712 14 : MemoryContextSwitchTo(oldctx);
1713 : }
1714 :
1715 : /* Open the spool file for this transaction. */
1716 363 : stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
1717 :
1718 : /* If this is not the first segment, open existing subxact file. */
1719 363 : if (!first_segment)
1720 331 : subxact_info_read(MyLogicalRepWorker->subid, xid);
1721 :
1722 363 : end_replication_step();
1723 363 : }
1724 :
1725 : /*
1726 : * Handle STREAM START message.
1727 : */
1728 : static void
1729 845 : apply_handle_stream_start(StringInfo s)
1730 : {
1731 : bool first_segment;
1732 : ParallelApplyWorkerInfo *winfo;
1733 : TransApplyAction apply_action;
1734 :
1735 : /* Save the message before it is consumed. */
1736 845 : StringInfoData original_msg = *s;
1737 :
1738 845 : if (in_streamed_transaction)
1739 0 : ereport(ERROR,
1740 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1741 : errmsg_internal("duplicate STREAM START message")));
1742 :
1743 : /* There must not be an active streaming transaction. */
1744 : Assert(!TransactionIdIsValid(stream_xid));
1745 :
1746 : /* notify handle methods we're processing a remote transaction */
1747 845 : in_streamed_transaction = true;
1748 :
1749 : /* extract XID of the top-level transaction */
1750 845 : stream_xid = logicalrep_read_stream_start(s, &first_segment);
1751 :
1752 845 : if (!TransactionIdIsValid(stream_xid))
1753 0 : ereport(ERROR,
1754 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1755 : errmsg_internal("invalid transaction ID in streamed replication transaction")));
1756 :
1757 845 : set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
1758 :
1759 : /* Try to allocate a worker for the streaming transaction. */
1760 845 : if (first_segment)
1761 88 : pa_allocate_worker(stream_xid);
1762 :
1763 845 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1764 :
1765 845 : switch (apply_action)
1766 : {
1767 343 : case TRANS_LEADER_SERIALIZE:
1768 :
1769 : /*
1770 : * Function stream_start_internal starts a transaction. This
1771 : * transaction will be committed on the stream stop unless it is a
1772 : * tablesync worker in which case it will be committed after
1773 : * processing all the messages. We need this transaction for
1774 : * handling the BufFile, used for serializing the streaming data
1775 : * and subxact info.
1776 : */
1777 343 : stream_start_internal(stream_xid, first_segment);
1778 343 : break;
1779 :
1780 245 : case TRANS_LEADER_SEND_TO_PARALLEL:
1781 : Assert(winfo);
1782 :
1783 : /*
1784 : * Once we start serializing the changes, the parallel apply
1785 : * worker will wait for the leader to release the stream lock
1786 : * until the end of the transaction. So, we don't need to release
1787 : * the lock or increment the stream count in that case.
1788 : */
1789 245 : if (pa_send_data(winfo, s->len, s->data))
1790 : {
1791 : /*
1792 : * Unlock the shared object lock so that the parallel apply
1793 : * worker can continue to receive changes.
1794 : */
1795 241 : if (!first_segment)
1796 215 : pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
1797 :
1798 : /*
1799 : * Increment the number of streaming blocks waiting to be
1800 : * processed by parallel apply worker.
1801 : */
1802 241 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
1803 :
1804 : /* Cache the parallel apply worker for this transaction. */
1805 241 : pa_set_stream_apply_worker(winfo);
1806 241 : break;
1807 : }
1808 :
1809 : /*
1810 : * Switch to serialize mode when we are not able to send the
1811 : * change to parallel apply worker.
1812 : */
1813 4 : pa_switch_to_partial_serialize(winfo, !first_segment);
1814 :
1815 : pg_fallthrough;
1816 15 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1817 : Assert(winfo);
1818 :
1819 : /*
1820 : * Open the spool file unless it was already opened when switching
1821 : * to serialize mode. The transaction started in
1822 : * stream_start_internal will be committed on the stream stop.
1823 : */
1824 15 : if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
1825 11 : stream_start_internal(stream_xid, first_segment);
1826 :
1827 15 : stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg);
1828 :
1829 : /* Cache the parallel apply worker for this transaction. */
1830 15 : pa_set_stream_apply_worker(winfo);
1831 15 : break;
1832 :
1833 246 : case TRANS_PARALLEL_APPLY:
1834 246 : if (first_segment)
1835 : {
1836 : /* Hold the lock until the end of the transaction. */
1837 30 : pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1838 30 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED);
1839 :
1840 : /*
1841 : * Signal the leader apply worker, as it may be waiting for
1842 : * us.
1843 : */
1844 30 : logicalrep_worker_wakeup(WORKERTYPE_APPLY,
1845 30 : MyLogicalRepWorker->subid, InvalidOid);
1846 : }
1847 :
1848 246 : parallel_stream_nchanges = 0;
1849 246 : break;
1850 :
1851 0 : default:
1852 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1853 : break;
1854 : }
1855 :
1856 845 : pgstat_report_activity(STATE_RUNNING, NULL);
1857 845 : }
1858 :
1859 : /*
1860 : * Update the information about subxacts and close the file.
1861 : *
1862 : * This function should be called when the stream_start_internal function has
1863 : * been called.
1864 : */
1865 : void
1866 363 : stream_stop_internal(TransactionId xid)
1867 : {
1868 : /*
1869 : * Serialize information about subxacts for the toplevel transaction, then
1870 : * close the stream messages spool file.
1871 : */
1872 363 : subxact_info_write(MyLogicalRepWorker->subid, xid);
1873 363 : stream_close_file();
1874 :
1875 : /* We must be in a valid transaction state */
1876 : Assert(IsTransactionState());
1877 :
1878 : /* Commit the per-stream transaction */
1879 363 : CommitTransactionCommand();
1880 :
1881 : /* Reset per-stream context */
1882 363 : MemoryContextReset(LogicalStreamingContext);
1883 363 : }
1884 :
1885 : /*
1886 : * Handle STREAM STOP message.
1887 : */
1888 : static void
1889 844 : apply_handle_stream_stop(StringInfo s)
1890 : {
1891 : ParallelApplyWorkerInfo *winfo;
1892 : TransApplyAction apply_action;
1893 :
1894 844 : if (!in_streamed_transaction)
1895 0 : ereport(ERROR,
1896 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1897 : errmsg_internal("STREAM STOP message without STREAM START")));
1898 :
1899 844 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1900 :
1901 844 : switch (apply_action)
1902 : {
1903 343 : case TRANS_LEADER_SERIALIZE:
1904 343 : stream_stop_internal(stream_xid);
1905 343 : break;
1906 :
1907 241 : case TRANS_LEADER_SEND_TO_PARALLEL:
1908 : Assert(winfo);
1909 :
1910 : /*
1911 : * Lock before sending the STREAM_STOP message so that the leader
1912 : * can hold the lock first and the parallel apply worker will wait
1913 : * for leader to release the lock. See Locking Considerations atop
1914 : * applyparallelworker.c.
1915 : */
1916 241 : pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
1917 :
1918 241 : if (pa_send_data(winfo, s->len, s->data))
1919 : {
1920 241 : pa_set_stream_apply_worker(NULL);
1921 241 : break;
1922 : }
1923 :
1924 : /*
1925 : * Switch to serialize mode when we are not able to send the
1926 : * change to parallel apply worker.
1927 : */
1928 0 : pa_switch_to_partial_serialize(winfo, true);
1929 :
1930 : pg_fallthrough;
1931 15 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1932 15 : stream_write_change(LOGICAL_REP_MSG_STREAM_STOP, s);
1933 15 : stream_stop_internal(stream_xid);
1934 15 : pa_set_stream_apply_worker(NULL);
1935 15 : break;
1936 :
1937 245 : case TRANS_PARALLEL_APPLY:
1938 245 : elog(DEBUG1, "applied %u changes in the streaming chunk",
1939 : parallel_stream_nchanges);
1940 :
1941 : /*
1942 : * By the time parallel apply worker is processing the changes in
1943 : * the current streaming block, the leader apply worker may have
1944 : * sent multiple streaming blocks. This can lead to parallel apply
1945 : * worker start waiting even when there are more chunk of streams
1946 : * in the queue. So, try to lock only if there is no message left
1947 : * in the queue. See Locking Considerations atop
1948 : * applyparallelworker.c.
1949 : *
1950 : * Note that here we have a race condition where we can start
1951 : * waiting even when there are pending streaming chunks. This can
1952 : * happen if the leader sends another streaming block and acquires
1953 : * the stream lock again after the parallel apply worker checks
1954 : * that there is no pending streaming block and before it actually
1955 : * starts waiting on a lock. We can handle this case by not
1956 : * allowing the leader to increment the stream block count during
1957 : * the time parallel apply worker acquires the lock but it is not
1958 : * clear whether that is worth the complexity.
1959 : *
1960 : * Now, if this missed chunk contains rollback to savepoint, then
1961 : * there is a risk of deadlock which probably shouldn't happen
1962 : * after restart.
1963 : */
1964 245 : pa_decr_and_wait_stream_block();
1965 243 : break;
1966 :
1967 0 : default:
1968 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1969 : break;
1970 : }
1971 :
1972 842 : in_streamed_transaction = false;
1973 842 : stream_xid = InvalidTransactionId;
1974 :
1975 : /*
1976 : * The parallel apply worker could be in a transaction in which case we
1977 : * need to report the state as STATE_IDLEINTRANSACTION.
1978 : */
1979 842 : if (IsTransactionOrTransactionBlock())
1980 243 : pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL);
1981 : else
1982 599 : pgstat_report_activity(STATE_IDLE, NULL);
1983 :
1984 842 : reset_apply_error_context_info();
1985 842 : }
1986 :
1987 : /*
1988 : * Helper function to handle STREAM ABORT message when the transaction was
1989 : * serialized to file.
1990 : */
1991 : static void
1992 14 : stream_abort_internal(TransactionId xid, TransactionId subxid)
1993 : {
1994 : /*
1995 : * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1996 : * just delete the files with serialized info.
1997 : */
1998 14 : if (xid == subxid)
1999 1 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
2000 : else
2001 : {
2002 : /*
2003 : * OK, so it's a subxact. We need to read the subxact file for the
2004 : * toplevel transaction, determine the offset tracked for the subxact,
2005 : * and truncate the file with changes. We also remove the subxacts
2006 : * with higher offsets (or rather higher XIDs).
2007 : *
2008 : * We intentionally scan the array from the tail, because we're likely
2009 : * aborting a change for the most recent subtransactions.
2010 : *
2011 : * We can't use the binary search here as subxact XIDs won't
2012 : * necessarily arrive in sorted order, consider the case where we have
2013 : * released the savepoint for multiple subtransactions and then
2014 : * performed rollback to savepoint for one of the earlier
2015 : * sub-transaction.
2016 : */
2017 : int64 i;
2018 : int64 subidx;
2019 : BufFile *fd;
2020 13 : bool found = false;
2021 : char path[MAXPGPATH];
2022 :
2023 13 : subidx = -1;
2024 13 : begin_replication_step();
2025 13 : subxact_info_read(MyLogicalRepWorker->subid, xid);
2026 :
2027 15 : for (i = subxact_data.nsubxacts; i > 0; i--)
2028 : {
2029 11 : if (subxact_data.subxacts[i - 1].xid == subxid)
2030 : {
2031 9 : subidx = (i - 1);
2032 9 : found = true;
2033 9 : break;
2034 : }
2035 : }
2036 :
2037 : /*
2038 : * If it's an empty sub-transaction then we will not find the subxid
2039 : * here so just cleanup the subxact info and return.
2040 : */
2041 13 : if (!found)
2042 : {
2043 : /* Cleanup the subxact info */
2044 4 : cleanup_subxact_info();
2045 4 : end_replication_step();
2046 4 : CommitTransactionCommand();
2047 4 : return;
2048 : }
2049 :
2050 : /* open the changes file */
2051 9 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2052 9 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path,
2053 : O_RDWR, false);
2054 :
2055 : /* OK, truncate the file at the right offset */
2056 9 : BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
2057 9 : subxact_data.subxacts[subidx].offset);
2058 9 : BufFileClose(fd);
2059 :
2060 : /* discard the subxacts added later */
2061 9 : subxact_data.nsubxacts = subidx;
2062 :
2063 : /* write the updated subxact list */
2064 9 : subxact_info_write(MyLogicalRepWorker->subid, xid);
2065 :
2066 9 : end_replication_step();
2067 9 : CommitTransactionCommand();
2068 : }
2069 : }
2070 :
2071 : /*
2072 : * Handle STREAM ABORT message.
2073 : */
2074 : static void
2075 38 : apply_handle_stream_abort(StringInfo s)
2076 : {
2077 : TransactionId xid;
2078 : TransactionId subxid;
2079 : LogicalRepStreamAbortData abort_data;
2080 : ParallelApplyWorkerInfo *winfo;
2081 : TransApplyAction apply_action;
2082 :
2083 : /* Save the message before it is consumed. */
2084 38 : StringInfoData original_msg = *s;
2085 : bool toplevel_xact;
2086 :
2087 38 : if (in_streamed_transaction)
2088 0 : ereport(ERROR,
2089 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2090 : errmsg_internal("STREAM ABORT message without STREAM STOP")));
2091 :
2092 : /* We receive abort information only when we can apply in parallel. */
2093 38 : logicalrep_read_stream_abort(s, &abort_data,
2094 38 : MyLogicalRepWorker->parallel_apply);
2095 :
2096 38 : xid = abort_data.xid;
2097 38 : subxid = abort_data.subxid;
2098 38 : toplevel_xact = (xid == subxid);
2099 :
2100 38 : set_apply_error_context_xact(subxid, abort_data.abort_lsn);
2101 :
2102 38 : apply_action = get_transaction_apply_action(xid, &winfo);
2103 :
2104 38 : switch (apply_action)
2105 : {
2106 14 : case TRANS_LEADER_APPLY:
2107 :
2108 : /*
2109 : * We are in the leader apply worker and the transaction has been
2110 : * serialized to file.
2111 : */
2112 14 : stream_abort_internal(xid, subxid);
2113 :
2114 14 : elog(DEBUG1, "finished processing the STREAM ABORT command");
2115 14 : break;
2116 :
2117 10 : case TRANS_LEADER_SEND_TO_PARALLEL:
2118 : Assert(winfo);
2119 :
2120 : /*
2121 : * For the case of aborting the subtransaction, we increment the
2122 : * number of streaming blocks and take the lock again before
2123 : * sending the STREAM_ABORT to ensure that the parallel apply
2124 : * worker will wait on the lock for the next set of changes after
2125 : * processing the STREAM_ABORT message if it is not already
2126 : * waiting for STREAM_STOP message.
2127 : *
2128 : * It is important to perform this locking before sending the
2129 : * STREAM_ABORT message so that the leader can hold the lock first
2130 : * and the parallel apply worker will wait for the leader to
2131 : * release the lock. This is the same as what we do in
2132 : * apply_handle_stream_stop. See Locking Considerations atop
2133 : * applyparallelworker.c.
2134 : */
2135 10 : if (!toplevel_xact)
2136 : {
2137 9 : pa_unlock_stream(xid, AccessExclusiveLock);
2138 9 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
2139 9 : pa_lock_stream(xid, AccessExclusiveLock);
2140 : }
2141 :
2142 10 : if (pa_send_data(winfo, s->len, s->data))
2143 : {
2144 : /*
2145 : * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
2146 : * wait here for the parallel apply worker to finish as that
2147 : * is not required to maintain the commit order and won't have
2148 : * the risk of failures due to transaction dependencies and
2149 : * deadlocks. However, it is possible that before the parallel
2150 : * worker finishes and we clear the worker info, the xid
2151 : * wraparound happens on the upstream and a new transaction
2152 : * with the same xid can appear and that can lead to duplicate
2153 : * entries in ParallelApplyTxnHash. Yet another problem could
2154 : * be that we may have serialized the changes in partial
2155 : * serialize mode and the file containing xact changes may
2156 : * already exist, and after xid wraparound trying to create
2157 : * the file for the same xid can lead to an error. To avoid
2158 : * these problems, we decide to wait for the aborts to finish.
2159 : *
2160 : * Note, it is okay to not update the flush location position
2161 : * for aborts as in worst case that means such a transaction
2162 : * won't be sent again after restart.
2163 : */
2164 10 : if (toplevel_xact)
2165 1 : pa_xact_finish(winfo, InvalidXLogRecPtr);
2166 :
2167 10 : break;
2168 : }
2169 :
2170 : /*
2171 : * Switch to serialize mode when we are not able to send the
2172 : * change to parallel apply worker.
2173 : */
2174 0 : pa_switch_to_partial_serialize(winfo, true);
2175 :
2176 : pg_fallthrough;
2177 2 : case TRANS_LEADER_PARTIAL_SERIALIZE:
2178 : Assert(winfo);
2179 :
2180 : /*
2181 : * Parallel apply worker might have applied some changes, so write
2182 : * the STREAM_ABORT message so that it can rollback the
2183 : * subtransaction if needed.
2184 : */
2185 2 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_ABORT,
2186 : &original_msg);
2187 :
2188 2 : if (toplevel_xact)
2189 : {
2190 1 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
2191 1 : pa_xact_finish(winfo, InvalidXLogRecPtr);
2192 : }
2193 2 : break;
2194 :
2195 12 : case TRANS_PARALLEL_APPLY:
2196 :
2197 : /*
2198 : * If the parallel apply worker is applying spooled messages then
2199 : * close the file before aborting.
2200 : */
2201 12 : if (toplevel_xact && stream_fd)
2202 1 : stream_close_file();
2203 :
2204 12 : pa_stream_abort(&abort_data);
2205 :
2206 : /*
2207 : * We need to wait after processing rollback to savepoint for the
2208 : * next set of changes.
2209 : *
2210 : * We have a race condition here due to which we can start waiting
2211 : * here when there are more chunk of streams in the queue. See
2212 : * apply_handle_stream_stop.
2213 : */
2214 12 : if (!toplevel_xact)
2215 10 : pa_decr_and_wait_stream_block();
2216 :
2217 12 : elog(DEBUG1, "finished processing the STREAM ABORT command");
2218 12 : break;
2219 :
2220 0 : default:
2221 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2222 : break;
2223 : }
2224 :
2225 38 : reset_apply_error_context_info();
2226 38 : }
2227 :
2228 : /*
2229 : * Ensure that the passed location is fileset's end.
2230 : */
2231 : static void
2232 4 : ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
2233 : pgoff_t offset)
2234 : {
2235 : char path[MAXPGPATH];
2236 : BufFile *fd;
2237 : int last_fileno;
2238 : pgoff_t last_offset;
2239 :
2240 : Assert(!IsTransactionState());
2241 :
2242 4 : begin_replication_step();
2243 :
2244 4 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2245 :
2246 4 : fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2247 :
2248 4 : BufFileSeek(fd, 0, 0, SEEK_END);
2249 4 : BufFileTell(fd, &last_fileno, &last_offset);
2250 :
2251 4 : BufFileClose(fd);
2252 :
2253 4 : end_replication_step();
2254 :
2255 4 : if (last_fileno != fileno || last_offset != offset)
2256 0 : elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2257 : path);
2258 4 : }
2259 :
2260 : /*
2261 : * Common spoolfile processing.
2262 : */
2263 : void
2264 31 : apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
2265 : XLogRecPtr lsn)
2266 : {
2267 : int nchanges;
2268 : char path[MAXPGPATH];
2269 31 : char *buffer = NULL;
2270 : MemoryContext oldcxt;
2271 : ResourceOwner oldowner;
2272 : int fileno;
2273 : pgoff_t offset;
2274 :
2275 31 : if (!am_parallel_apply_worker())
2276 27 : maybe_start_skipping_changes(lsn);
2277 :
2278 : /* Make sure we have an open transaction */
2279 31 : begin_replication_step();
2280 :
2281 : /*
2282 : * Allocate file handle and memory required to process all the messages in
2283 : * TopTransactionContext to avoid them getting reset after each message is
2284 : * processed.
2285 : */
2286 31 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
2287 :
2288 : /* Open the spool file for the committed/prepared transaction */
2289 31 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2290 31 : elog(DEBUG1, "replaying changes from file \"%s\"", path);
2291 :
2292 : /*
2293 : * Make sure the file is owned by the toplevel transaction so that the
2294 : * file will not be accidentally closed when aborting a subtransaction.
2295 : */
2296 31 : oldowner = CurrentResourceOwner;
2297 31 : CurrentResourceOwner = TopTransactionResourceOwner;
2298 :
2299 31 : stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2300 :
2301 31 : CurrentResourceOwner = oldowner;
2302 :
2303 31 : buffer = palloc(BLCKSZ);
2304 :
2305 31 : MemoryContextSwitchTo(oldcxt);
2306 :
2307 31 : remote_final_lsn = lsn;
2308 :
2309 : /*
2310 : * Make sure the handle apply_dispatch methods are aware we're in a remote
2311 : * transaction.
2312 : */
2313 31 : in_remote_transaction = true;
2314 31 : pgstat_report_activity(STATE_RUNNING, NULL);
2315 :
2316 31 : end_replication_step();
2317 :
2318 : /*
2319 : * Read the entries one by one and pass them through the same logic as in
2320 : * apply_dispatch.
2321 : */
2322 31 : nchanges = 0;
2323 : while (true)
2324 88470 : {
2325 : StringInfoData s2;
2326 : size_t nbytes;
2327 : int len;
2328 :
2329 88501 : CHECK_FOR_INTERRUPTS();
2330 :
2331 : /* read length of the on-disk record */
2332 88501 : nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2333 :
2334 : /* have we reached end of the file? */
2335 88501 : if (nbytes == 0)
2336 26 : break;
2337 :
2338 : /* do we have a correct length? */
2339 88475 : if (len <= 0)
2340 0 : elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2341 : len, path);
2342 :
2343 : /* make sure we have sufficiently large buffer */
2344 88475 : buffer = repalloc(buffer, len);
2345 :
2346 : /* and finally read the data into the buffer */
2347 88475 : BufFileReadExact(stream_fd, buffer, len);
2348 :
2349 88475 : BufFileTell(stream_fd, &fileno, &offset);
2350 :
2351 : /* init a stringinfo using the buffer and call apply_dispatch */
2352 88475 : initReadOnlyStringInfo(&s2, buffer, len);
2353 :
2354 : /* Ensure we are reading the data into our memory context. */
2355 88475 : oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
2356 :
2357 88475 : apply_dispatch(&s2);
2358 :
2359 88474 : MemoryContextReset(ApplyMessageContext);
2360 :
2361 88474 : MemoryContextSwitchTo(oldcxt);
2362 :
2363 88474 : nchanges++;
2364 :
2365 : /*
2366 : * It is possible the file has been closed because we have processed
2367 : * the transaction end message like stream_commit in which case that
2368 : * must be the last message.
2369 : */
2370 88474 : if (!stream_fd)
2371 : {
2372 4 : ensure_last_message(stream_fileset, xid, fileno, offset);
2373 4 : break;
2374 : }
2375 :
2376 88470 : if (nchanges % 1000 == 0)
2377 84 : elog(DEBUG1, "replayed %d changes from file \"%s\"",
2378 : nchanges, path);
2379 : }
2380 :
2381 30 : if (stream_fd)
2382 26 : stream_close_file();
2383 :
2384 30 : elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2385 : nchanges, path);
2386 :
2387 30 : return;
2388 : }
2389 :
2390 : /*
2391 : * Handle STREAM COMMIT message.
2392 : */
2393 : static void
2394 61 : apply_handle_stream_commit(StringInfo s)
2395 : {
2396 : TransactionId xid;
2397 : LogicalRepCommitData commit_data;
2398 : ParallelApplyWorkerInfo *winfo;
2399 : TransApplyAction apply_action;
2400 :
2401 : /* Save the message before it is consumed. */
2402 61 : StringInfoData original_msg = *s;
2403 :
2404 61 : if (in_streamed_transaction)
2405 0 : ereport(ERROR,
2406 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2407 : errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2408 :
2409 61 : xid = logicalrep_read_stream_commit(s, &commit_data);
2410 61 : set_apply_error_context_xact(xid, commit_data.commit_lsn);
2411 :
2412 61 : apply_action = get_transaction_apply_action(xid, &winfo);
2413 :
2414 61 : switch (apply_action)
2415 : {
2416 22 : case TRANS_LEADER_APPLY:
2417 :
2418 : /*
2419 : * The transaction has been serialized to file, so replay all the
2420 : * spooled operations.
2421 : */
2422 22 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid,
2423 : commit_data.commit_lsn);
2424 :
2425 21 : apply_handle_commit_internal(&commit_data);
2426 :
2427 : /* Unlink the files with serialized changes and subxact info. */
2428 21 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
2429 :
2430 21 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2431 21 : break;
2432 :
2433 18 : case TRANS_LEADER_SEND_TO_PARALLEL:
2434 : Assert(winfo);
2435 :
2436 18 : if (pa_send_data(winfo, s->len, s->data))
2437 : {
2438 : /* Finish processing the streaming transaction. */
2439 18 : pa_xact_finish(winfo, commit_data.end_lsn);
2440 17 : break;
2441 : }
2442 :
2443 : /*
2444 : * Switch to serialize mode when we are not able to send the
2445 : * change to parallel apply worker.
2446 : */
2447 0 : pa_switch_to_partial_serialize(winfo, true);
2448 :
2449 : pg_fallthrough;
2450 2 : case TRANS_LEADER_PARTIAL_SERIALIZE:
2451 : Assert(winfo);
2452 :
2453 2 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_COMMIT,
2454 : &original_msg);
2455 :
2456 2 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
2457 :
2458 : /* Finish processing the streaming transaction. */
2459 2 : pa_xact_finish(winfo, commit_data.end_lsn);
2460 2 : break;
2461 :
2462 19 : case TRANS_PARALLEL_APPLY:
2463 :
2464 : /*
2465 : * If the parallel apply worker is applying spooled messages then
2466 : * close the file before committing.
2467 : */
2468 19 : if (stream_fd)
2469 2 : stream_close_file();
2470 :
2471 19 : apply_handle_commit_internal(&commit_data);
2472 :
2473 19 : MyParallelShared->last_commit_end = XactLastCommitEnd;
2474 :
2475 : /*
2476 : * It is important to set the transaction state as finished before
2477 : * releasing the lock. See pa_wait_for_xact_finish.
2478 : */
2479 19 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
2480 19 : pa_unlock_transaction(xid, AccessExclusiveLock);
2481 :
2482 19 : pa_reset_subtrans();
2483 :
2484 19 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2485 19 : break;
2486 :
2487 0 : default:
2488 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2489 : break;
2490 : }
2491 :
2492 : /*
2493 : * Process any tables that are being synchronized in parallel, as well as
2494 : * any newly added tables or sequences.
2495 : */
2496 59 : ProcessSyncingRelations(commit_data.end_lsn);
2497 :
2498 59 : pgstat_report_activity(STATE_IDLE, NULL);
2499 :
2500 59 : reset_apply_error_context_info();
2501 59 : }
2502 :
2503 : /*
2504 : * Helper function for apply_handle_commit and apply_handle_stream_commit.
2505 : */
2506 : static void
2507 482 : apply_handle_commit_internal(LogicalRepCommitData *commit_data)
2508 : {
2509 482 : if (is_skipping_changes())
2510 : {
2511 2 : stop_skipping_changes();
2512 :
2513 : /*
2514 : * Start a new transaction to clear the subskiplsn, if not started
2515 : * yet.
2516 : */
2517 2 : if (!IsTransactionState())
2518 1 : StartTransactionCommand();
2519 : }
2520 :
2521 482 : if (IsTransactionState())
2522 : {
2523 : /*
2524 : * The transaction is either non-empty or skipped, so we clear the
2525 : * subskiplsn.
2526 : */
2527 482 : clear_subscription_skip_lsn(commit_data->commit_lsn);
2528 :
2529 : /*
2530 : * Update origin state so we can restart streaming from correct
2531 : * position in case of crash.
2532 : */
2533 482 : replorigin_xact_state.origin_lsn = commit_data->end_lsn;
2534 482 : replorigin_xact_state.origin_timestamp = commit_data->committime;
2535 :
2536 482 : CommitTransactionCommand();
2537 :
2538 482 : if (IsTransactionBlock())
2539 : {
2540 4 : EndTransactionBlock(false);
2541 4 : CommitTransactionCommand();
2542 : }
2543 :
2544 482 : pgstat_report_stat(false);
2545 :
2546 482 : store_flush_position(commit_data->end_lsn, XactLastCommitEnd);
2547 : }
2548 : else
2549 : {
2550 : /* Process any invalidation messages that might have accumulated. */
2551 0 : AcceptInvalidationMessages();
2552 0 : maybe_reread_subscription();
2553 : }
2554 :
2555 482 : in_remote_transaction = false;
2556 482 : }
2557 :
2558 : /*
2559 : * Handle RELATION message.
2560 : *
2561 : * Note we don't do validation against local schema here. The validation
2562 : * against local schema is postponed until first change for given relation
2563 : * comes as we only care about it when applying changes for it anyway and we
2564 : * do less locking this way.
2565 : */
2566 : static void
2567 480 : apply_handle_relation(StringInfo s)
2568 : {
2569 : LogicalRepRelation *rel;
2570 :
2571 480 : if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
2572 36 : return;
2573 :
2574 444 : rel = logicalrep_read_rel(s);
2575 444 : logicalrep_relmap_update(rel);
2576 :
2577 : /* Also reset all entries in the partition map that refer to remoterel. */
2578 444 : logicalrep_partmap_reset_relmap(rel);
2579 : }
2580 :
2581 : /*
2582 : * Handle TYPE message.
2583 : *
2584 : * This implementation pays no attention to TYPE messages; we expect the user
2585 : * to have set things up so that the incoming data is acceptable to the input
2586 : * functions for the locally subscribed tables. Hence, we just read and
2587 : * discard the message.
2588 : */
2589 : static void
2590 18 : apply_handle_type(StringInfo s)
2591 : {
2592 : LogicalRepTyp typ;
2593 :
2594 18 : if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s))
2595 0 : return;
2596 :
2597 18 : logicalrep_read_typ(s, &typ);
2598 : }
2599 :
2600 : /*
2601 : * Check that we (the subscription owner) have sufficient privileges on the
2602 : * target relation to perform the given operation.
2603 : */
2604 : static void
2605 220722 : TargetPrivilegesCheck(Relation rel, AclMode mode)
2606 : {
2607 : Oid relid;
2608 : AclResult aclresult;
2609 :
2610 220722 : relid = RelationGetRelid(rel);
2611 220722 : aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
2612 220722 : if (aclresult != ACLCHECK_OK)
2613 9 : aclcheck_error(aclresult,
2614 9 : get_relkind_objtype(rel->rd_rel->relkind),
2615 9 : get_rel_name(relid));
2616 :
2617 : /*
2618 : * We lack the infrastructure to honor RLS policies. It might be possible
2619 : * to add such infrastructure here, but tablesync workers lack it, too, so
2620 : * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2621 : * but it seems dangerous to replicate a TRUNCATE and then refuse to
2622 : * replicate subsequent INSERTs, so we forbid all commands the same.
2623 : */
2624 220713 : if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2625 3 : ereport(ERROR,
2626 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2627 : errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2628 : GetUserNameFromId(GetUserId(), true),
2629 : RelationGetRelationName(rel))));
2630 220710 : }
2631 :
2632 : /*
2633 : * Handle INSERT message.
2634 : */
2635 :
2636 : static void
2637 186254 : apply_handle_insert(StringInfo s)
2638 : {
2639 : LogicalRepRelMapEntry *rel;
2640 : LogicalRepTupleData newtup;
2641 : LogicalRepRelId relid;
2642 : UserContext ucxt;
2643 : ApplyExecutionData *edata;
2644 : EState *estate;
2645 : TupleTableSlot *remoteslot;
2646 : MemoryContext oldctx;
2647 : bool run_as_owner;
2648 :
2649 : /*
2650 : * Quick return if we are skipping data modification changes or handling
2651 : * streamed transactions.
2652 : */
2653 362507 : if (is_skipping_changes() ||
2654 176253 : handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
2655 110057 : return;
2656 :
2657 76244 : begin_replication_step();
2658 :
2659 76242 : relid = logicalrep_read_insert(s, &newtup);
2660 76242 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2661 76235 : if (!should_apply_changes_for_rel(rel))
2662 : {
2663 : /*
2664 : * The relation can't become interesting in the middle of the
2665 : * transaction so it's safe to unlock it.
2666 : */
2667 47 : logicalrep_rel_close(rel, RowExclusiveLock);
2668 47 : end_replication_step();
2669 47 : return;
2670 : }
2671 :
2672 : /*
2673 : * Make sure that any user-supplied code runs as the table owner, unless
2674 : * the user has opted out of that behavior.
2675 : */
2676 76188 : run_as_owner = MySubscription->runasowner;
2677 76188 : if (!run_as_owner)
2678 76180 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2679 :
2680 : /* Set relation for error callback */
2681 76188 : apply_error_callback_arg.rel = rel;
2682 :
2683 : /* Initialize the executor state. */
2684 76188 : edata = create_edata_for_relation(rel);
2685 76188 : estate = edata->estate;
2686 76188 : remoteslot = ExecInitExtraTupleSlot(estate,
2687 76188 : RelationGetDescr(rel->localrel),
2688 : &TTSOpsVirtual);
2689 :
2690 : /* Process and store remote tuple in the slot */
2691 76188 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2692 76188 : slot_store_data(remoteslot, rel, &newtup);
2693 76188 : slot_fill_defaults(rel, estate, remoteslot);
2694 76188 : MemoryContextSwitchTo(oldctx);
2695 :
2696 : /* For a partitioned table, insert the tuple into a partition. */
2697 76188 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2698 67 : apply_handle_tuple_routing(edata,
2699 : remoteslot, NULL, CMD_INSERT);
2700 : else
2701 : {
2702 76121 : ResultRelInfo *relinfo = edata->targetRelInfo;
2703 :
2704 76121 : ExecOpenIndices(relinfo, false);
2705 76121 : apply_handle_insert_internal(edata, relinfo, remoteslot);
2706 76106 : ExecCloseIndices(relinfo);
2707 : }
2708 :
2709 76150 : finish_edata(edata);
2710 :
2711 : /* Reset relation for error callback */
2712 76150 : apply_error_callback_arg.rel = NULL;
2713 :
2714 76150 : if (!run_as_owner)
2715 76145 : RestoreUserContext(&ucxt);
2716 :
2717 76150 : logicalrep_rel_close(rel, NoLock);
2718 :
2719 76150 : end_replication_step();
2720 : }
2721 :
2722 : /*
2723 : * Workhorse for apply_handle_insert()
2724 : * relinfo is for the relation we're actually inserting into
2725 : * (could be a child partition of edata->targetRelInfo)
2726 : */
2727 : static void
2728 76189 : apply_handle_insert_internal(ApplyExecutionData *edata,
2729 : ResultRelInfo *relinfo,
2730 : TupleTableSlot *remoteslot)
2731 : {
2732 76189 : EState *estate = edata->estate;
2733 :
2734 : /* Caller should have opened indexes already. */
2735 : Assert(relinfo->ri_IndexRelationDescs != NULL ||
2736 : !relinfo->ri_RelationDesc->rd_rel->relhasindex ||
2737 : RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
2738 :
2739 : /* Caller will not have done this bit. */
2740 : Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
2741 76189 : InitConflictIndexes(relinfo);
2742 :
2743 : /* Do the insert. */
2744 76189 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
2745 76182 : ExecSimpleRelationInsert(relinfo, estate, remoteslot);
2746 76151 : }
2747 :
2748 : /*
2749 : * Check if the logical replication relation is updatable and throw
2750 : * appropriate error if it isn't.
2751 : */
2752 : static void
2753 72292 : check_relation_updatable(LogicalRepRelMapEntry *rel)
2754 : {
2755 : /*
2756 : * For partitioned tables, we only need to care if the target partition is
2757 : * updatable (aka has PK or RI defined for it).
2758 : */
2759 72292 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2760 30 : return;
2761 :
2762 : /* Updatable, no error. */
2763 72262 : if (rel->updatable)
2764 72262 : return;
2765 :
2766 : /*
2767 : * We are in error mode so it's fine this is somewhat slow. It's better to
2768 : * give user correct error.
2769 : */
2770 0 : if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
2771 : {
2772 0 : ereport(ERROR,
2773 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2774 : errmsg("publisher did not send replica identity column "
2775 : "expected by the logical replication target relation \"%s.%s\"",
2776 : rel->remoterel.nspname, rel->remoterel.relname)));
2777 : }
2778 :
2779 0 : ereport(ERROR,
2780 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2781 : errmsg("logical replication target relation \"%s.%s\" has "
2782 : "neither REPLICA IDENTITY index nor PRIMARY "
2783 : "KEY and published relation does not have "
2784 : "REPLICA IDENTITY FULL",
2785 : rel->remoterel.nspname, rel->remoterel.relname)));
2786 : }
2787 :
2788 : /*
2789 : * Handle UPDATE message.
2790 : *
2791 : * TODO: FDW support
2792 : */
2793 : static void
2794 66165 : apply_handle_update(StringInfo s)
2795 : {
2796 : LogicalRepRelMapEntry *rel;
2797 : LogicalRepRelId relid;
2798 : UserContext ucxt;
2799 : ApplyExecutionData *edata;
2800 : EState *estate;
2801 : LogicalRepTupleData oldtup;
2802 : LogicalRepTupleData newtup;
2803 : bool has_oldtup;
2804 : TupleTableSlot *remoteslot;
2805 : RTEPermissionInfo *target_perminfo;
2806 : MemoryContext oldctx;
2807 : bool run_as_owner;
2808 :
2809 : /*
2810 : * Quick return if we are skipping data modification changes or handling
2811 : * streamed transactions.
2812 : */
2813 132327 : if (is_skipping_changes() ||
2814 66162 : handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
2815 34223 : return;
2816 :
2817 31942 : begin_replication_step();
2818 :
2819 31941 : relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
2820 : &newtup);
2821 31941 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2822 31941 : if (!should_apply_changes_for_rel(rel))
2823 : {
2824 : /*
2825 : * The relation can't become interesting in the middle of the
2826 : * transaction so it's safe to unlock it.
2827 : */
2828 0 : logicalrep_rel_close(rel, RowExclusiveLock);
2829 0 : end_replication_step();
2830 0 : return;
2831 : }
2832 :
2833 : /* Set relation for error callback */
2834 31941 : apply_error_callback_arg.rel = rel;
2835 :
2836 : /* Check if we can do the update. */
2837 31941 : check_relation_updatable(rel);
2838 :
2839 : /*
2840 : * Make sure that any user-supplied code runs as the table owner, unless
2841 : * the user has opted out of that behavior.
2842 : */
2843 31941 : run_as_owner = MySubscription->runasowner;
2844 31941 : if (!run_as_owner)
2845 31937 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2846 :
2847 : /* Initialize the executor state. */
2848 31940 : edata = create_edata_for_relation(rel);
2849 31940 : estate = edata->estate;
2850 31940 : remoteslot = ExecInitExtraTupleSlot(estate,
2851 31940 : RelationGetDescr(rel->localrel),
2852 : &TTSOpsVirtual);
2853 :
2854 : /*
2855 : * Populate updatedCols so that per-column triggers can fire, and so
2856 : * executor can correctly pass down indexUnchanged hint. This could
2857 : * include more columns than were actually changed on the publisher
2858 : * because the logical replication protocol doesn't contain that
2859 : * information. But it would for example exclude columns that only exist
2860 : * on the subscriber, since we are not touching those.
2861 : */
2862 31940 : target_perminfo = list_nth(estate->es_rteperminfos, 0);
2863 159323 : for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2864 : {
2865 127383 : CompactAttribute *att = TupleDescCompactAttr(remoteslot->tts_tupleDescriptor, i);
2866 127383 : int remoteattnum = rel->attrmap->attnums[i];
2867 :
2868 127383 : if (!att->attisdropped && remoteattnum >= 0)
2869 : {
2870 : Assert(remoteattnum < newtup.ncols);
2871 68866 : if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
2872 68866 : target_perminfo->updatedCols =
2873 68866 : bms_add_member(target_perminfo->updatedCols,
2874 : i + 1 - FirstLowInvalidHeapAttributeNumber);
2875 : }
2876 : }
2877 :
2878 : /* Build the search tuple. */
2879 31940 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2880 31940 : slot_store_data(remoteslot, rel,
2881 31940 : has_oldtup ? &oldtup : &newtup);
2882 31940 : MemoryContextSwitchTo(oldctx);
2883 :
2884 : /* For a partitioned table, apply update to correct partition. */
2885 31940 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2886 13 : apply_handle_tuple_routing(edata,
2887 : remoteslot, &newtup, CMD_UPDATE);
2888 : else
2889 31927 : apply_handle_update_internal(edata, edata->targetRelInfo,
2890 : remoteslot, &newtup, rel->localindexoid);
2891 :
2892 31933 : finish_edata(edata);
2893 :
2894 : /* Reset relation for error callback */
2895 31933 : apply_error_callback_arg.rel = NULL;
2896 :
2897 31933 : if (!run_as_owner)
2898 31931 : RestoreUserContext(&ucxt);
2899 :
2900 31933 : logicalrep_rel_close(rel, NoLock);
2901 :
2902 31933 : end_replication_step();
2903 : }
2904 :
2905 : /*
2906 : * Workhorse for apply_handle_update()
2907 : * relinfo is for the relation we're actually updating in
2908 : * (could be a child partition of edata->targetRelInfo)
2909 : */
2910 : static void
2911 31927 : apply_handle_update_internal(ApplyExecutionData *edata,
2912 : ResultRelInfo *relinfo,
2913 : TupleTableSlot *remoteslot,
2914 : LogicalRepTupleData *newtup,
2915 : Oid localindexoid)
2916 : {
2917 31927 : EState *estate = edata->estate;
2918 31927 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2919 31927 : Relation localrel = relinfo->ri_RelationDesc;
2920 : EPQState epqstate;
2921 31927 : TupleTableSlot *localslot = NULL;
2922 31927 : ConflictTupleInfo conflicttuple = {0};
2923 : bool found;
2924 : MemoryContext oldctx;
2925 :
2926 31927 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2927 31927 : ExecOpenIndices(relinfo, false);
2928 :
2929 31927 : found = FindReplTupleInLocalRel(edata, localrel,
2930 : &relmapentry->remoterel,
2931 : localindexoid,
2932 : remoteslot, &localslot);
2933 :
2934 : /*
2935 : * Tuple found.
2936 : *
2937 : * Note this will fail if there are other conflicting unique indexes.
2938 : */
2939 31922 : if (found)
2940 : {
2941 : /*
2942 : * Report the conflict if the tuple was modified by a different
2943 : * origin.
2944 : */
2945 31913 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
2946 2 : &conflicttuple.origin, &conflicttuple.ts) &&
2947 2 : conflicttuple.origin != replorigin_xact_state.origin)
2948 : {
2949 : TupleTableSlot *newslot;
2950 :
2951 : /* Store the new tuple for conflict reporting */
2952 2 : newslot = table_slot_create(localrel, &estate->es_tupleTable);
2953 2 : slot_store_data(newslot, relmapentry, newtup);
2954 :
2955 2 : conflicttuple.slot = localslot;
2956 :
2957 2 : ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
2958 : remoteslot, newslot,
2959 2 : list_make1(&conflicttuple));
2960 : }
2961 :
2962 : /* Process and store remote tuple in the slot */
2963 31913 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2964 31913 : slot_modify_data(remoteslot, localslot, relmapentry, newtup);
2965 31913 : MemoryContextSwitchTo(oldctx);
2966 :
2967 31913 : EvalPlanQualSetSlot(&epqstate, remoteslot);
2968 :
2969 31913 : InitConflictIndexes(relinfo);
2970 :
2971 : /* Do the actual update. */
2972 31913 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
2973 31913 : ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2974 : remoteslot);
2975 : }
2976 : else
2977 : {
2978 : ConflictType type;
2979 9 : TupleTableSlot *newslot = localslot;
2980 :
2981 : /*
2982 : * Detecting whether the tuple was recently deleted or never existed
2983 : * is crucial to avoid misleading the user during conflict handling.
2984 : */
2985 9 : if (FindDeletedTupleInLocalRel(localrel, localindexoid, remoteslot,
2986 : &conflicttuple.xmin,
2987 : &conflicttuple.origin,
2988 3 : &conflicttuple.ts) &&
2989 3 : conflicttuple.origin != replorigin_xact_state.origin)
2990 3 : type = CT_UPDATE_DELETED;
2991 : else
2992 6 : type = CT_UPDATE_MISSING;
2993 :
2994 : /* Store the new tuple for conflict reporting */
2995 9 : slot_store_data(newslot, relmapentry, newtup);
2996 :
2997 : /*
2998 : * The tuple to be updated could not be found or was deleted. Do
2999 : * nothing except for emitting a log message.
3000 : */
3001 9 : ReportApplyConflict(estate, relinfo, LOG, type, remoteslot, newslot,
3002 9 : list_make1(&conflicttuple));
3003 : }
3004 :
3005 : /* Cleanup. */
3006 31920 : ExecCloseIndices(relinfo);
3007 31920 : EvalPlanQualEnd(&epqstate);
3008 31920 : }
3009 :
3010 : /*
3011 : * Handle DELETE message.
3012 : *
3013 : * TODO: FDW support
3014 : */
3015 : static void
3016 81936 : apply_handle_delete(StringInfo s)
3017 : {
3018 : LogicalRepRelMapEntry *rel;
3019 : LogicalRepTupleData oldtup;
3020 : LogicalRepRelId relid;
3021 : UserContext ucxt;
3022 : ApplyExecutionData *edata;
3023 : EState *estate;
3024 : TupleTableSlot *remoteslot;
3025 : MemoryContext oldctx;
3026 : bool run_as_owner;
3027 :
3028 : /*
3029 : * Quick return if we are skipping data modification changes or handling
3030 : * streamed transactions.
3031 : */
3032 163872 : if (is_skipping_changes() ||
3033 81936 : handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
3034 41615 : return;
3035 :
3036 40321 : begin_replication_step();
3037 :
3038 40321 : relid = logicalrep_read_delete(s, &oldtup);
3039 40321 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
3040 40321 : if (!should_apply_changes_for_rel(rel))
3041 : {
3042 : /*
3043 : * The relation can't become interesting in the middle of the
3044 : * transaction so it's safe to unlock it.
3045 : */
3046 0 : logicalrep_rel_close(rel, RowExclusiveLock);
3047 0 : end_replication_step();
3048 0 : return;
3049 : }
3050 :
3051 : /* Set relation for error callback */
3052 40321 : apply_error_callback_arg.rel = rel;
3053 :
3054 : /* Check if we can do the delete. */
3055 40321 : check_relation_updatable(rel);
3056 :
3057 : /*
3058 : * Make sure that any user-supplied code runs as the table owner, unless
3059 : * the user has opted out of that behavior.
3060 : */
3061 40321 : run_as_owner = MySubscription->runasowner;
3062 40321 : if (!run_as_owner)
3063 40319 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
3064 :
3065 : /* Initialize the executor state. */
3066 40321 : edata = create_edata_for_relation(rel);
3067 40321 : estate = edata->estate;
3068 40321 : remoteslot = ExecInitExtraTupleSlot(estate,
3069 40321 : RelationGetDescr(rel->localrel),
3070 : &TTSOpsVirtual);
3071 :
3072 : /* Build the search tuple. */
3073 40321 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3074 40321 : slot_store_data(remoteslot, rel, &oldtup);
3075 40321 : MemoryContextSwitchTo(oldctx);
3076 :
3077 : /* For a partitioned table, apply delete to correct partition. */
3078 40321 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3079 17 : apply_handle_tuple_routing(edata,
3080 : remoteslot, NULL, CMD_DELETE);
3081 : else
3082 : {
3083 40304 : ResultRelInfo *relinfo = edata->targetRelInfo;
3084 :
3085 40304 : ExecOpenIndices(relinfo, false);
3086 40304 : apply_handle_delete_internal(edata, relinfo,
3087 : remoteslot, rel->localindexoid);
3088 40304 : ExecCloseIndices(relinfo);
3089 : }
3090 :
3091 40321 : finish_edata(edata);
3092 :
3093 : /* Reset relation for error callback */
3094 40321 : apply_error_callback_arg.rel = NULL;
3095 :
3096 40321 : if (!run_as_owner)
3097 40319 : RestoreUserContext(&ucxt);
3098 :
3099 40321 : logicalrep_rel_close(rel, NoLock);
3100 :
3101 40321 : end_replication_step();
3102 : }
3103 :
3104 : /*
3105 : * Workhorse for apply_handle_delete()
3106 : * relinfo is for the relation we're actually deleting from
3107 : * (could be a child partition of edata->targetRelInfo)
3108 : */
3109 : static void
3110 40321 : apply_handle_delete_internal(ApplyExecutionData *edata,
3111 : ResultRelInfo *relinfo,
3112 : TupleTableSlot *remoteslot,
3113 : Oid localindexoid)
3114 : {
3115 40321 : EState *estate = edata->estate;
3116 40321 : Relation localrel = relinfo->ri_RelationDesc;
3117 40321 : LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
3118 : EPQState epqstate;
3119 : TupleTableSlot *localslot;
3120 40321 : ConflictTupleInfo conflicttuple = {0};
3121 : bool found;
3122 :
3123 40321 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3124 :
3125 : /* Caller should have opened indexes already. */
3126 : Assert(relinfo->ri_IndexRelationDescs != NULL ||
3127 : !localrel->rd_rel->relhasindex ||
3128 : RelationGetIndexList(localrel) == NIL);
3129 :
3130 40321 : found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
3131 : remoteslot, &localslot);
3132 :
3133 : /* If found delete it. */
3134 40321 : if (found)
3135 : {
3136 : /*
3137 : * Report the conflict if the tuple was modified by a different
3138 : * origin.
3139 : */
3140 40312 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
3141 6 : &conflicttuple.origin, &conflicttuple.ts) &&
3142 6 : conflicttuple.origin != replorigin_xact_state.origin)
3143 : {
3144 5 : conflicttuple.slot = localslot;
3145 5 : ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
3146 : remoteslot, NULL,
3147 5 : list_make1(&conflicttuple));
3148 : }
3149 :
3150 40312 : EvalPlanQualSetSlot(&epqstate, localslot);
3151 :
3152 : /* Do the actual delete. */
3153 40312 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_DELETE);
3154 40312 : ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
3155 : }
3156 : else
3157 : {
3158 : /*
3159 : * The tuple to be deleted could not be found. Do nothing except for
3160 : * emitting a log message.
3161 : */
3162 9 : ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
3163 9 : remoteslot, NULL, list_make1(&conflicttuple));
3164 : }
3165 :
3166 : /* Cleanup. */
3167 40321 : EvalPlanQualEnd(&epqstate);
3168 40321 : }
3169 :
3170 : /*
3171 : * Try to find a tuple received from the publication side (in 'remoteslot') in
3172 : * the corresponding local relation using either replica identity index,
3173 : * primary key, index or if needed, sequential scan.
3174 : *
3175 : * Local tuple, if found, is returned in '*localslot'.
3176 : */
3177 : static bool
3178 72261 : FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
3179 : LogicalRepRelation *remoterel,
3180 : Oid localidxoid,
3181 : TupleTableSlot *remoteslot,
3182 : TupleTableSlot **localslot)
3183 : {
3184 72261 : EState *estate = edata->estate;
3185 : bool found;
3186 :
3187 : /*
3188 : * Regardless of the top-level operation, we're performing a read here, so
3189 : * check for SELECT privileges.
3190 : */
3191 72261 : TargetPrivilegesCheck(localrel, ACL_SELECT);
3192 :
3193 72256 : *localslot = table_slot_create(localrel, &estate->es_tupleTable);
3194 :
3195 : Assert(OidIsValid(localidxoid) ||
3196 : (remoterel->replident == REPLICA_IDENTITY_FULL));
3197 :
3198 72256 : if (OidIsValid(localidxoid))
3199 : {
3200 : #ifdef USE_ASSERT_CHECKING
3201 : Relation idxrel = index_open(localidxoid, AccessShareLock);
3202 :
3203 : /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
3204 : Assert(GetRelationIdentityOrPK(localrel) == localidxoid ||
3205 : (remoterel->replident == REPLICA_IDENTITY_FULL &&
3206 : IsIndexUsableForReplicaIdentityFull(idxrel,
3207 : edata->targetRel->attrmap)));
3208 : index_close(idxrel, AccessShareLock);
3209 : #endif
3210 :
3211 72105 : found = RelationFindReplTupleByIndex(localrel, localidxoid,
3212 : LockTupleExclusive,
3213 : remoteslot, *localslot);
3214 : }
3215 : else
3216 151 : found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
3217 : remoteslot, *localslot);
3218 :
3219 72256 : return found;
3220 : }
3221 :
3222 : /*
3223 : * Determine whether the index can reliably locate the deleted tuple in the
3224 : * local relation.
3225 : *
3226 : * An index may exclude deleted tuples if it was re-indexed or re-created during
3227 : * change application. Therefore, an index is considered usable only if the
3228 : * conflict detection slot.xmin (conflict_detection_xmin) is greater than the
3229 : * index tuple's xmin. This ensures that any tuples deleted prior to the index
3230 : * creation or re-indexing are not relevant for conflict detection in the
3231 : * current apply worker.
3232 : *
3233 : * Note that indexes may also be excluded if they were modified by other DDL
3234 : * operations, such as ALTER INDEX. However, this is acceptable, as the
3235 : * likelihood of such DDL changes coinciding with the need to scan dead
3236 : * tuples for the update_deleted is low.
3237 : */
3238 : static bool
3239 1 : IsIndexUsableForFindingDeletedTuple(Oid localindexoid,
3240 : TransactionId conflict_detection_xmin)
3241 : {
3242 : HeapTuple index_tuple;
3243 : TransactionId index_xmin;
3244 :
3245 1 : index_tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(localindexoid));
3246 :
3247 1 : if (!HeapTupleIsValid(index_tuple)) /* should not happen */
3248 0 : elog(ERROR, "cache lookup failed for index %u", localindexoid);
3249 :
3250 : /*
3251 : * No need to check for a frozen transaction ID, as
3252 : * TransactionIdPrecedes() manages it internally, treating it as falling
3253 : * behind the conflict_detection_xmin.
3254 : */
3255 1 : index_xmin = HeapTupleHeaderGetXmin(index_tuple->t_data);
3256 :
3257 1 : ReleaseSysCache(index_tuple);
3258 :
3259 1 : return TransactionIdPrecedes(index_xmin, conflict_detection_xmin);
3260 : }
3261 :
3262 : /*
3263 : * Attempts to locate a deleted tuple in the local relation that matches the
3264 : * values of the tuple received from the publication side (in 'remoteslot').
3265 : * The search is performed using either the replica identity index, primary
3266 : * key, other available index, or a sequential scan if necessary.
3267 : *
3268 : * Returns true if the deleted tuple is found. If found, the transaction ID,
3269 : * origin, and commit timestamp of the deletion are stored in '*delete_xid',
3270 : * '*delete_origin', and '*delete_time' respectively.
3271 : */
3272 : static bool
3273 11 : FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid,
3274 : TupleTableSlot *remoteslot,
3275 : TransactionId *delete_xid, ReplOriginId *delete_origin,
3276 : TimestampTz *delete_time)
3277 : {
3278 : TransactionId oldestxmin;
3279 :
3280 : /*
3281 : * Return false if either dead tuples are not retained or commit timestamp
3282 : * data is not available.
3283 : */
3284 11 : if (!MySubscription->retaindeadtuples || !track_commit_timestamp)
3285 8 : return false;
3286 :
3287 : /*
3288 : * For conflict detection, we use the leader worker's
3289 : * oldest_nonremovable_xid value instead of invoking
3290 : * GetOldestNonRemovableTransactionId() or using the conflict detection
3291 : * slot's xmin. The oldest_nonremovable_xid acts as a threshold to
3292 : * identify tuples that were recently deleted. These deleted tuples are no
3293 : * longer visible to concurrent transactions. However, if a remote update
3294 : * matches such a tuple, we log an update_deleted conflict.
3295 : *
3296 : * While GetOldestNonRemovableTransactionId() and slot.xmin may return
3297 : * transaction IDs older than oldest_nonremovable_xid, for our current
3298 : * purpose, it is acceptable to treat tuples deleted by transactions prior
3299 : * to oldest_nonremovable_xid as update_missing conflicts.
3300 : */
3301 3 : if (am_leader_apply_worker())
3302 : {
3303 3 : oldestxmin = MyLogicalRepWorker->oldest_nonremovable_xid;
3304 : }
3305 : else
3306 : {
3307 : LogicalRepWorker *leader;
3308 :
3309 : /*
3310 : * Obtain the information from the leader apply worker as only the
3311 : * leader manages oldest_nonremovable_xid (see
3312 : * maybe_advance_nonremovable_xid() for details).
3313 : */
3314 0 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
3315 0 : leader = logicalrep_worker_find(WORKERTYPE_APPLY,
3316 0 : MyLogicalRepWorker->subid, InvalidOid,
3317 : false);
3318 0 : if (!leader)
3319 : {
3320 0 : ereport(ERROR,
3321 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3322 : errmsg("could not detect conflict as the leader apply worker has exited")));
3323 : }
3324 :
3325 0 : SpinLockAcquire(&leader->relmutex);
3326 0 : oldestxmin = leader->oldest_nonremovable_xid;
3327 0 : SpinLockRelease(&leader->relmutex);
3328 0 : LWLockRelease(LogicalRepWorkerLock);
3329 : }
3330 :
3331 : /*
3332 : * Return false if the leader apply worker has stopped retaining
3333 : * information for detecting conflicts. This implies that update_deleted
3334 : * can no longer be reliably detected.
3335 : */
3336 3 : if (!TransactionIdIsValid(oldestxmin))
3337 0 : return false;
3338 :
3339 4 : if (OidIsValid(localidxoid) &&
3340 1 : IsIndexUsableForFindingDeletedTuple(localidxoid, oldestxmin))
3341 1 : return RelationFindDeletedTupleInfoByIndex(localrel, localidxoid,
3342 : remoteslot, oldestxmin,
3343 : delete_xid, delete_origin,
3344 : delete_time);
3345 : else
3346 2 : return RelationFindDeletedTupleInfoSeq(localrel, remoteslot,
3347 : oldestxmin, delete_xid,
3348 : delete_origin, delete_time);
3349 : }
3350 :
3351 : /*
3352 : * This handles insert, update, delete on a partitioned table.
3353 : */
3354 : static void
3355 97 : apply_handle_tuple_routing(ApplyExecutionData *edata,
3356 : TupleTableSlot *remoteslot,
3357 : LogicalRepTupleData *newtup,
3358 : CmdType operation)
3359 : {
3360 97 : EState *estate = edata->estate;
3361 97 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
3362 97 : ResultRelInfo *relinfo = edata->targetRelInfo;
3363 97 : Relation parentrel = relinfo->ri_RelationDesc;
3364 : ModifyTableState *mtstate;
3365 : PartitionTupleRouting *proute;
3366 : ResultRelInfo *partrelinfo;
3367 : Relation partrel;
3368 : TupleTableSlot *remoteslot_part;
3369 : TupleConversionMap *map;
3370 : MemoryContext oldctx;
3371 97 : LogicalRepRelMapEntry *part_entry = NULL;
3372 97 : AttrMap *attrmap = NULL;
3373 :
3374 : /* ModifyTableState is needed for ExecFindPartition(). */
3375 97 : edata->mtstate = mtstate = makeNode(ModifyTableState);
3376 97 : mtstate->ps.plan = NULL;
3377 97 : mtstate->ps.state = estate;
3378 97 : mtstate->operation = operation;
3379 97 : mtstate->resultRelInfo = relinfo;
3380 :
3381 : /* ... as is PartitionTupleRouting. */
3382 97 : edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
3383 :
3384 : /*
3385 : * Find the partition to which the "search tuple" belongs.
3386 : */
3387 : Assert(remoteslot != NULL);
3388 97 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3389 97 : partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
3390 : remoteslot, estate);
3391 : Assert(partrelinfo != NULL);
3392 97 : partrel = partrelinfo->ri_RelationDesc;
3393 :
3394 : /*
3395 : * Check for supported relkind. We need this since partitions might be of
3396 : * unsupported relkinds; and the set of partitions can change, so checking
3397 : * at CREATE/ALTER SUBSCRIPTION would be insufficient.
3398 : */
3399 97 : CheckSubscriptionRelkind(partrel->rd_rel->relkind,
3400 97 : relmapentry->remoterel.relkind,
3401 97 : get_namespace_name(RelationGetNamespace(partrel)),
3402 97 : RelationGetRelationName(partrel));
3403 :
3404 : /*
3405 : * To perform any of the operations below, the tuple must match the
3406 : * partition's rowtype. Convert if needed or just copy, using a dedicated
3407 : * slot to store the tuple in any case.
3408 : */
3409 97 : remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
3410 97 : if (remoteslot_part == NULL)
3411 64 : remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
3412 97 : map = ExecGetRootToChildMap(partrelinfo, estate);
3413 97 : if (map != NULL)
3414 : {
3415 33 : attrmap = map->attrMap;
3416 33 : remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
3417 : remoteslot_part);
3418 : }
3419 : else
3420 : {
3421 64 : remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
3422 64 : slot_getallattrs(remoteslot_part);
3423 : }
3424 97 : MemoryContextSwitchTo(oldctx);
3425 :
3426 : /* Check if we can do the update or delete on the leaf partition. */
3427 97 : if (operation == CMD_UPDATE || operation == CMD_DELETE)
3428 : {
3429 30 : part_entry = logicalrep_partition_open(relmapentry, partrel,
3430 : attrmap);
3431 30 : check_relation_updatable(part_entry);
3432 : }
3433 :
3434 97 : switch (operation)
3435 : {
3436 67 : case CMD_INSERT:
3437 67 : apply_handle_insert_internal(edata, partrelinfo,
3438 : remoteslot_part);
3439 44 : break;
3440 :
3441 17 : case CMD_DELETE:
3442 17 : apply_handle_delete_internal(edata, partrelinfo,
3443 : remoteslot_part,
3444 : part_entry->localindexoid);
3445 17 : break;
3446 :
3447 13 : case CMD_UPDATE:
3448 :
3449 : /*
3450 : * For UPDATE, depending on whether or not the updated tuple
3451 : * satisfies the partition's constraint, perform a simple UPDATE
3452 : * of the partition or move the updated tuple into a different
3453 : * suitable partition.
3454 : */
3455 : {
3456 : TupleTableSlot *localslot;
3457 : ResultRelInfo *partrelinfo_new;
3458 : Relation partrel_new;
3459 : bool found;
3460 : EPQState epqstate;
3461 13 : ConflictTupleInfo conflicttuple = {0};
3462 :
3463 : /* Get the matching local tuple from the partition. */
3464 13 : found = FindReplTupleInLocalRel(edata, partrel,
3465 : &part_entry->remoterel,
3466 : part_entry->localindexoid,
3467 : remoteslot_part, &localslot);
3468 13 : if (!found)
3469 : {
3470 : ConflictType type;
3471 2 : TupleTableSlot *newslot = localslot;
3472 :
3473 : /*
3474 : * Detecting whether the tuple was recently deleted or
3475 : * never existed is crucial to avoid misleading the user
3476 : * during conflict handling.
3477 : */
3478 2 : if (FindDeletedTupleInLocalRel(partrel,
3479 : part_entry->localindexoid,
3480 : remoteslot_part,
3481 : &conflicttuple.xmin,
3482 : &conflicttuple.origin,
3483 0 : &conflicttuple.ts) &&
3484 0 : conflicttuple.origin != replorigin_xact_state.origin)
3485 0 : type = CT_UPDATE_DELETED;
3486 : else
3487 2 : type = CT_UPDATE_MISSING;
3488 :
3489 : /* Store the new tuple for conflict reporting */
3490 2 : slot_store_data(newslot, part_entry, newtup);
3491 :
3492 : /*
3493 : * The tuple to be updated could not be found or was
3494 : * deleted. Do nothing except for emitting a log message.
3495 : */
3496 2 : ReportApplyConflict(estate, partrelinfo, LOG,
3497 : type, remoteslot_part, newslot,
3498 2 : list_make1(&conflicttuple));
3499 :
3500 2 : return;
3501 : }
3502 :
3503 : /*
3504 : * Report the conflict if the tuple was modified by a
3505 : * different origin.
3506 : */
3507 11 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
3508 : &conflicttuple.origin,
3509 1 : &conflicttuple.ts) &&
3510 1 : conflicttuple.origin != replorigin_xact_state.origin)
3511 : {
3512 : TupleTableSlot *newslot;
3513 :
3514 : /* Store the new tuple for conflict reporting */
3515 1 : newslot = table_slot_create(partrel, &estate->es_tupleTable);
3516 1 : slot_store_data(newslot, part_entry, newtup);
3517 :
3518 1 : conflicttuple.slot = localslot;
3519 :
3520 1 : ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
3521 : remoteslot_part, newslot,
3522 1 : list_make1(&conflicttuple));
3523 : }
3524 :
3525 : /*
3526 : * Apply the update to the local tuple, putting the result in
3527 : * remoteslot_part.
3528 : */
3529 11 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3530 11 : slot_modify_data(remoteslot_part, localslot, part_entry,
3531 : newtup);
3532 11 : MemoryContextSwitchTo(oldctx);
3533 :
3534 11 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3535 :
3536 : /*
3537 : * Does the updated tuple still satisfy the current
3538 : * partition's constraint?
3539 : */
3540 22 : if (!partrel->rd_rel->relispartition ||
3541 11 : ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
3542 : false))
3543 : {
3544 : /*
3545 : * Yes, so simply UPDATE the partition. We don't call
3546 : * apply_handle_update_internal() here, which would
3547 : * normally do the following work, to avoid repeating some
3548 : * work already done above to find the local tuple in the
3549 : * partition.
3550 : */
3551 10 : InitConflictIndexes(partrelinfo);
3552 :
3553 10 : EvalPlanQualSetSlot(&epqstate, remoteslot_part);
3554 10 : TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
3555 : ACL_UPDATE);
3556 10 : ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3557 : localslot, remoteslot_part);
3558 : }
3559 : else
3560 : {
3561 : /* Move the tuple into the new partition. */
3562 :
3563 : /*
3564 : * New partition will be found using tuple routing, which
3565 : * can only occur via the parent table. We might need to
3566 : * convert the tuple to the parent's rowtype. Note that
3567 : * this is the tuple found in the partition, not the
3568 : * original search tuple received by this function.
3569 : */
3570 1 : if (map)
3571 : {
3572 : TupleConversionMap *PartitionToRootMap =
3573 1 : convert_tuples_by_name(RelationGetDescr(partrel),
3574 : RelationGetDescr(parentrel));
3575 :
3576 : remoteslot =
3577 1 : execute_attr_map_slot(PartitionToRootMap->attrMap,
3578 : remoteslot_part, remoteslot);
3579 : }
3580 : else
3581 : {
3582 0 : remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
3583 0 : slot_getallattrs(remoteslot);
3584 : }
3585 :
3586 : /* Find the new partition. */
3587 1 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3588 1 : partrelinfo_new = ExecFindPartition(mtstate, relinfo,
3589 : proute, remoteslot,
3590 : estate);
3591 1 : MemoryContextSwitchTo(oldctx);
3592 : Assert(partrelinfo_new != partrelinfo);
3593 1 : partrel_new = partrelinfo_new->ri_RelationDesc;
3594 :
3595 : /* Check that new partition also has supported relkind. */
3596 1 : CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3597 1 : relmapentry->remoterel.relkind,
3598 1 : get_namespace_name(RelationGetNamespace(partrel_new)),
3599 1 : RelationGetRelationName(partrel_new));
3600 :
3601 : /* DELETE old tuple found in the old partition. */
3602 1 : EvalPlanQualSetSlot(&epqstate, localslot);
3603 1 : TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, ACL_DELETE);
3604 1 : ExecSimpleRelationDelete(partrelinfo, estate, &epqstate, localslot);
3605 :
3606 : /* INSERT new tuple into the new partition. */
3607 :
3608 : /*
3609 : * Convert the replacement tuple to match the destination
3610 : * partition rowtype.
3611 : */
3612 1 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3613 1 : remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3614 1 : if (remoteslot_part == NULL)
3615 1 : remoteslot_part = table_slot_create(partrel_new,
3616 : &estate->es_tupleTable);
3617 1 : map = ExecGetRootToChildMap(partrelinfo_new, estate);
3618 1 : if (map != NULL)
3619 : {
3620 0 : remoteslot_part = execute_attr_map_slot(map->attrMap,
3621 : remoteslot,
3622 : remoteslot_part);
3623 : }
3624 : else
3625 : {
3626 1 : remoteslot_part = ExecCopySlot(remoteslot_part,
3627 : remoteslot);
3628 1 : slot_getallattrs(remoteslot);
3629 : }
3630 1 : MemoryContextSwitchTo(oldctx);
3631 1 : apply_handle_insert_internal(edata, partrelinfo_new,
3632 : remoteslot_part);
3633 : }
3634 :
3635 11 : EvalPlanQualEnd(&epqstate);
3636 : }
3637 11 : break;
3638 :
3639 0 : default:
3640 0 : elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3641 : break;
3642 : }
3643 : }
3644 :
3645 : /*
3646 : * Handle TRUNCATE message.
3647 : *
3648 : * TODO: FDW support
3649 : */
3650 : static void
3651 20 : apply_handle_truncate(StringInfo s)
3652 : {
3653 20 : bool cascade = false;
3654 20 : bool restart_seqs = false;
3655 20 : List *remote_relids = NIL;
3656 20 : List *remote_rels = NIL;
3657 20 : List *rels = NIL;
3658 20 : List *part_rels = NIL;
3659 20 : List *relids = NIL;
3660 20 : List *relids_logged = NIL;
3661 : ListCell *lc;
3662 20 : LOCKMODE lockmode = AccessExclusiveLock;
3663 :
3664 : /*
3665 : * Quick return if we are skipping data modification changes or handling
3666 : * streamed transactions.
3667 : */
3668 40 : if (is_skipping_changes() ||
3669 20 : handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
3670 0 : return;
3671 :
3672 20 : begin_replication_step();
3673 :
3674 20 : remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3675 :
3676 49 : foreach(lc, remote_relids)
3677 : {
3678 29 : LogicalRepRelId relid = lfirst_oid(lc);
3679 : LogicalRepRelMapEntry *rel;
3680 :
3681 29 : rel = logicalrep_rel_open(relid, lockmode);
3682 29 : if (!should_apply_changes_for_rel(rel))
3683 : {
3684 : /*
3685 : * The relation can't become interesting in the middle of the
3686 : * transaction so it's safe to unlock it.
3687 : */
3688 0 : logicalrep_rel_close(rel, lockmode);
3689 0 : continue;
3690 : }
3691 :
3692 29 : remote_rels = lappend(remote_rels, rel);
3693 29 : TargetPrivilegesCheck(rel->localrel, ACL_TRUNCATE);
3694 29 : rels = lappend(rels, rel->localrel);
3695 29 : relids = lappend_oid(relids, rel->localreloid);
3696 29 : if (RelationIsLogicallyLogged(rel->localrel))
3697 1 : relids_logged = lappend_oid(relids_logged, rel->localreloid);
3698 :
3699 : /*
3700 : * Truncate partitions if we got a message to truncate a partitioned
3701 : * table.
3702 : */
3703 29 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3704 : {
3705 : ListCell *child;
3706 4 : List *children = find_all_inheritors(rel->localreloid,
3707 : lockmode,
3708 : NULL);
3709 :
3710 15 : foreach(child, children)
3711 : {
3712 11 : Oid childrelid = lfirst_oid(child);
3713 : Relation childrel;
3714 :
3715 11 : if (list_member_oid(relids, childrelid))
3716 4 : continue;
3717 :
3718 : /* find_all_inheritors already got lock */
3719 7 : childrel = table_open(childrelid, NoLock);
3720 :
3721 : /*
3722 : * Ignore temp tables of other backends. See similar code in
3723 : * ExecuteTruncate().
3724 : */
3725 7 : if (RELATION_IS_OTHER_TEMP(childrel))
3726 : {
3727 0 : table_close(childrel, lockmode);
3728 0 : continue;
3729 : }
3730 :
3731 7 : TargetPrivilegesCheck(childrel, ACL_TRUNCATE);
3732 7 : rels = lappend(rels, childrel);
3733 7 : part_rels = lappend(part_rels, childrel);
3734 7 : relids = lappend_oid(relids, childrelid);
3735 : /* Log this relation only if needed for logical decoding */
3736 7 : if (RelationIsLogicallyLogged(childrel))
3737 0 : relids_logged = lappend_oid(relids_logged, childrelid);
3738 : }
3739 : }
3740 : }
3741 :
3742 : /*
3743 : * Even if we used CASCADE on the upstream primary we explicitly default
3744 : * to replaying changes without further cascading. This might be later
3745 : * changeable with a user specified option.
3746 : *
3747 : * MySubscription->runasowner tells us whether we want to execute
3748 : * replication actions as the subscription owner; the last argument to
3749 : * TruncateGuts tells it whether we want to switch to the table owner.
3750 : * Those are exactly opposite conditions.
3751 : */
3752 20 : ExecuteTruncateGuts(rels,
3753 : relids,
3754 : relids_logged,
3755 : DROP_RESTRICT,
3756 : restart_seqs,
3757 20 : !MySubscription->runasowner);
3758 49 : foreach(lc, remote_rels)
3759 : {
3760 29 : LogicalRepRelMapEntry *rel = lfirst(lc);
3761 :
3762 29 : logicalrep_rel_close(rel, NoLock);
3763 : }
3764 27 : foreach(lc, part_rels)
3765 : {
3766 7 : Relation rel = lfirst(lc);
3767 :
3768 7 : table_close(rel, NoLock);
3769 : }
3770 :
3771 20 : end_replication_step();
3772 : }
3773 :
3774 :
3775 : /*
3776 : * Logical replication protocol message dispatcher.
3777 : */
3778 : void
3779 337683 : apply_dispatch(StringInfo s)
3780 : {
3781 337683 : LogicalRepMsgType action = pq_getmsgbyte(s);
3782 : LogicalRepMsgType saved_command;
3783 :
3784 : /*
3785 : * Set the current command being applied. Since this function can be
3786 : * called recursively when applying spooled changes, save the current
3787 : * command.
3788 : */
3789 337683 : saved_command = apply_error_callback_arg.command;
3790 337683 : apply_error_callback_arg.command = action;
3791 :
3792 337683 : switch (action)
3793 : {
3794 496 : case LOGICAL_REP_MSG_BEGIN:
3795 496 : apply_handle_begin(s);
3796 496 : break;
3797 :
3798 442 : case LOGICAL_REP_MSG_COMMIT:
3799 442 : apply_handle_commit(s);
3800 442 : break;
3801 :
3802 186254 : case LOGICAL_REP_MSG_INSERT:
3803 186254 : apply_handle_insert(s);
3804 186207 : break;
3805 :
3806 66165 : case LOGICAL_REP_MSG_UPDATE:
3807 66165 : apply_handle_update(s);
3808 66156 : break;
3809 :
3810 81936 : case LOGICAL_REP_MSG_DELETE:
3811 81936 : apply_handle_delete(s);
3812 81936 : break;
3813 :
3814 20 : case LOGICAL_REP_MSG_TRUNCATE:
3815 20 : apply_handle_truncate(s);
3816 20 : break;
3817 :
3818 480 : case LOGICAL_REP_MSG_RELATION:
3819 480 : apply_handle_relation(s);
3820 480 : break;
3821 :
3822 18 : case LOGICAL_REP_MSG_TYPE:
3823 18 : apply_handle_type(s);
3824 18 : break;
3825 :
3826 7 : case LOGICAL_REP_MSG_ORIGIN:
3827 7 : apply_handle_origin(s);
3828 7 : break;
3829 :
3830 0 : case LOGICAL_REP_MSG_MESSAGE:
3831 :
3832 : /*
3833 : * Logical replication does not use generic logical messages yet.
3834 : * Although, it could be used by other applications that use this
3835 : * output plugin.
3836 : */
3837 0 : break;
3838 :
3839 845 : case LOGICAL_REP_MSG_STREAM_START:
3840 845 : apply_handle_stream_start(s);
3841 845 : break;
3842 :
3843 844 : case LOGICAL_REP_MSG_STREAM_STOP:
3844 844 : apply_handle_stream_stop(s);
3845 842 : break;
3846 :
3847 38 : case LOGICAL_REP_MSG_STREAM_ABORT:
3848 38 : apply_handle_stream_abort(s);
3849 38 : break;
3850 :
3851 61 : case LOGICAL_REP_MSG_STREAM_COMMIT:
3852 61 : apply_handle_stream_commit(s);
3853 59 : break;
3854 :
3855 17 : case LOGICAL_REP_MSG_BEGIN_PREPARE:
3856 17 : apply_handle_begin_prepare(s);
3857 17 : break;
3858 :
3859 16 : case LOGICAL_REP_MSG_PREPARE:
3860 16 : apply_handle_prepare(s);
3861 15 : break;
3862 :
3863 22 : case LOGICAL_REP_MSG_COMMIT_PREPARED:
3864 22 : apply_handle_commit_prepared(s);
3865 22 : break;
3866 :
3867 5 : case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
3868 5 : apply_handle_rollback_prepared(s);
3869 5 : break;
3870 :
3871 17 : case LOGICAL_REP_MSG_STREAM_PREPARE:
3872 17 : apply_handle_stream_prepare(s);
3873 13 : break;
3874 :
3875 0 : default:
3876 0 : ereport(ERROR,
3877 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
3878 : errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3879 : }
3880 :
3881 : /* Reset the current command */
3882 337618 : apply_error_callback_arg.command = saved_command;
3883 337618 : }
3884 :
3885 : /*
3886 : * Figure out which write/flush positions to report to the walsender process.
3887 : *
3888 : * We can't simply report back the last LSN the walsender sent us because the
3889 : * local transaction might not yet be flushed to disk locally. Instead we
3890 : * build a list that associates local with remote LSNs for every commit. When
3891 : * reporting back the flush position to the sender we iterate that list and
3892 : * check which entries on it are already locally flushed. Those we can report
3893 : * as having been flushed.
3894 : *
3895 : * The have_pending_txes is true if there are outstanding transactions that
3896 : * need to be flushed.
3897 : */
3898 : static void
3899 34261 : get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
3900 : bool *have_pending_txes)
3901 : {
3902 : dlist_mutable_iter iter;
3903 34261 : XLogRecPtr local_flush = GetFlushRecPtr(NULL);
3904 :
3905 34261 : *write = InvalidXLogRecPtr;
3906 34261 : *flush = InvalidXLogRecPtr;
3907 :
3908 34775 : dlist_foreach_modify(iter, &lsn_mapping)
3909 : {
3910 5053 : FlushPosition *pos =
3911 : dlist_container(FlushPosition, node, iter.cur);
3912 :
3913 5053 : *write = pos->remote_end;
3914 :
3915 5053 : if (pos->local_end <= local_flush)
3916 : {
3917 514 : *flush = pos->remote_end;
3918 514 : dlist_delete(iter.cur);
3919 514 : pfree(pos);
3920 : }
3921 : else
3922 : {
3923 : /*
3924 : * Don't want to uselessly iterate over the rest of the list which
3925 : * could potentially be long. Instead get the last element and
3926 : * grab the write position from there.
3927 : */
3928 4539 : pos = dlist_tail_element(FlushPosition, node,
3929 : &lsn_mapping);
3930 4539 : *write = pos->remote_end;
3931 4539 : *have_pending_txes = true;
3932 4539 : return;
3933 : }
3934 : }
3935 :
3936 29722 : *have_pending_txes = !dlist_is_empty(&lsn_mapping);
3937 : }
3938 :
3939 : /*
3940 : * Store current remote/local lsn pair in the tracking list.
3941 : */
3942 : void
3943 552 : store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
3944 : {
3945 : FlushPosition *flushpos;
3946 :
3947 : /*
3948 : * Skip for parallel apply workers, because the lsn_mapping is maintained
3949 : * by the leader apply worker.
3950 : */
3951 552 : if (am_parallel_apply_worker())
3952 19 : return;
3953 :
3954 : /* Need to do this in permanent context */
3955 533 : MemoryContextSwitchTo(ApplyContext);
3956 :
3957 : /* Track commit lsn */
3958 533 : flushpos = palloc_object(FlushPosition);
3959 533 : flushpos->local_end = local_lsn;
3960 533 : flushpos->remote_end = remote_lsn;
3961 :
3962 533 : dlist_push_tail(&lsn_mapping, &flushpos->node);
3963 533 : MemoryContextSwitchTo(ApplyMessageContext);
3964 : }
3965 :
3966 :
3967 : /* Update statistics of the worker. */
3968 : static void
3969 197860 : UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
3970 : {
3971 197860 : MyLogicalRepWorker->last_lsn = last_lsn;
3972 197860 : MyLogicalRepWorker->last_send_time = send_time;
3973 197860 : MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
3974 197860 : if (reply)
3975 : {
3976 1809 : MyLogicalRepWorker->reply_lsn = last_lsn;
3977 1809 : MyLogicalRepWorker->reply_time = send_time;
3978 : }
3979 197860 : }
3980 :
3981 : /*
3982 : * Apply main loop.
3983 : */
3984 : static void
3985 432 : LogicalRepApplyLoop(XLogRecPtr last_received)
3986 : {
3987 432 : TimestampTz last_recv_timestamp = GetCurrentTimestamp();
3988 432 : bool ping_sent = false;
3989 : TimeLineID tli;
3990 : ErrorContextCallback errcallback;
3991 432 : RetainDeadTuplesData rdt_data = {0};
3992 :
3993 : /*
3994 : * Init the ApplyMessageContext which we clean up after each replication
3995 : * protocol message.
3996 : */
3997 432 : ApplyMessageContext = AllocSetContextCreate(ApplyContext,
3998 : "ApplyMessageContext",
3999 : ALLOCSET_DEFAULT_SIZES);
4000 :
4001 : /*
4002 : * This memory context is used for per-stream data when the streaming mode
4003 : * is enabled. This context is reset on each stream stop.
4004 : */
4005 432 : LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
4006 : "LogicalStreamingContext",
4007 : ALLOCSET_DEFAULT_SIZES);
4008 :
4009 : /* mark as idle, before starting to loop */
4010 432 : pgstat_report_activity(STATE_IDLE, NULL);
4011 :
4012 : /*
4013 : * Push apply error context callback. Fields will be filled while applying
4014 : * a change.
4015 : */
4016 432 : errcallback.callback = apply_error_callback;
4017 432 : errcallback.previous = error_context_stack;
4018 432 : error_context_stack = &errcallback;
4019 432 : apply_error_context_stack = error_context_stack;
4020 :
4021 : /* This outer loop iterates once per wait. */
4022 : for (;;)
4023 31819 : {
4024 32251 : pgsocket fd = PGINVALID_SOCKET;
4025 : int rc;
4026 : int len;
4027 32251 : char *buf = NULL;
4028 32251 : bool endofstream = false;
4029 : long wait_time;
4030 :
4031 32251 : CHECK_FOR_INTERRUPTS();
4032 :
4033 32250 : MemoryContextSwitchTo(ApplyMessageContext);
4034 :
4035 32250 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
4036 :
4037 32230 : if (len != 0)
4038 : {
4039 : /* Loop to process all available data (without blocking). */
4040 : for (;;)
4041 : {
4042 228942 : CHECK_FOR_INTERRUPTS();
4043 :
4044 228942 : if (len == 0)
4045 : {
4046 31072 : break;
4047 : }
4048 197870 : else if (len < 0)
4049 : {
4050 10 : ereport(LOG,
4051 : (errmsg("data stream from publisher has ended")));
4052 10 : endofstream = true;
4053 10 : break;
4054 : }
4055 : else
4056 : {
4057 : int c;
4058 : StringInfoData s;
4059 :
4060 197860 : if (ConfigReloadPending)
4061 : {
4062 0 : ConfigReloadPending = false;
4063 0 : ProcessConfigFile(PGC_SIGHUP);
4064 : }
4065 :
4066 : /* Reset timeout. */
4067 197860 : last_recv_timestamp = GetCurrentTimestamp();
4068 197860 : ping_sent = false;
4069 :
4070 197860 : rdt_data.last_recv_time = last_recv_timestamp;
4071 :
4072 : /* Ensure we are reading the data into our memory context. */
4073 197860 : MemoryContextSwitchTo(ApplyMessageContext);
4074 :
4075 197860 : initReadOnlyStringInfo(&s, buf, len);
4076 :
4077 197860 : c = pq_getmsgbyte(&s);
4078 :
4079 197860 : if (c == PqReplMsg_WALData)
4080 : {
4081 : XLogRecPtr start_lsn;
4082 : XLogRecPtr end_lsn;
4083 : TimestampTz send_time;
4084 :
4085 185004 : start_lsn = pq_getmsgint64(&s);
4086 185004 : end_lsn = pq_getmsgint64(&s);
4087 185004 : send_time = pq_getmsgint64(&s);
4088 :
4089 185004 : if (last_received < start_lsn)
4090 149166 : last_received = start_lsn;
4091 :
4092 185004 : if (last_received < end_lsn)
4093 0 : last_received = end_lsn;
4094 :
4095 185004 : UpdateWorkerStats(last_received, send_time, false);
4096 :
4097 185004 : apply_dispatch(&s);
4098 :
4099 184944 : maybe_advance_nonremovable_xid(&rdt_data, false);
4100 : }
4101 12856 : else if (c == PqReplMsg_Keepalive)
4102 : {
4103 : XLogRecPtr end_lsn;
4104 : TimestampTz timestamp;
4105 : bool reply_requested;
4106 :
4107 1809 : end_lsn = pq_getmsgint64(&s);
4108 1809 : timestamp = pq_getmsgint64(&s);
4109 1809 : reply_requested = pq_getmsgbyte(&s);
4110 :
4111 1809 : if (last_received < end_lsn)
4112 1011 : last_received = end_lsn;
4113 :
4114 1809 : send_feedback(last_received, reply_requested, false);
4115 :
4116 1809 : maybe_advance_nonremovable_xid(&rdt_data, false);
4117 :
4118 1809 : UpdateWorkerStats(last_received, timestamp, true);
4119 : }
4120 11047 : else if (c == PqReplMsg_PrimaryStatusUpdate)
4121 : {
4122 11047 : rdt_data.remote_lsn = pq_getmsgint64(&s);
4123 11047 : rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
4124 11047 : rdt_data.remote_nextxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
4125 11047 : rdt_data.reply_time = pq_getmsgint64(&s);
4126 :
4127 : /*
4128 : * This should never happen, see
4129 : * ProcessStandbyPSRequestMessage. But if it happens
4130 : * due to a bug, we don't want to proceed as it can
4131 : * incorrectly advance oldest_nonremovable_xid.
4132 : */
4133 11047 : if (!XLogRecPtrIsValid(rdt_data.remote_lsn))
4134 0 : elog(ERROR, "cannot get the latest WAL position from the publisher");
4135 :
4136 11047 : maybe_advance_nonremovable_xid(&rdt_data, true);
4137 :
4138 11047 : UpdateWorkerStats(last_received, rdt_data.reply_time, false);
4139 : }
4140 : /* other message types are purposefully ignored */
4141 :
4142 197800 : MemoryContextReset(ApplyMessageContext);
4143 : }
4144 :
4145 197800 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
4146 : }
4147 : }
4148 :
4149 : /* confirm all writes so far */
4150 32170 : send_feedback(last_received, false, false);
4151 :
4152 : /* Reset the timestamp if no message was received */
4153 32170 : rdt_data.last_recv_time = 0;
4154 :
4155 32170 : maybe_advance_nonremovable_xid(&rdt_data, false);
4156 :
4157 32169 : if (!in_remote_transaction && !in_streamed_transaction)
4158 : {
4159 : /*
4160 : * If we didn't get any transactions for a while there might be
4161 : * unconsumed invalidation messages in the queue, consume them
4162 : * now.
4163 : */
4164 6470 : AcceptInvalidationMessages();
4165 6470 : maybe_reread_subscription();
4166 :
4167 : /*
4168 : * Process any relations that are being synchronized in parallel
4169 : * and any newly added tables or sequences.
4170 : */
4171 6429 : ProcessSyncingRelations(last_received);
4172 : }
4173 :
4174 : /* Cleanup the memory. */
4175 31929 : MemoryContextReset(ApplyMessageContext);
4176 31929 : MemoryContextSwitchTo(TopMemoryContext);
4177 :
4178 : /* Check if we need to exit the streaming loop. */
4179 31929 : if (endofstream)
4180 10 : break;
4181 :
4182 : /*
4183 : * Wait for more data or latch. If we have unflushed transactions,
4184 : * wake up after WalWriterDelay to see if they've been flushed yet (in
4185 : * which case we should send a feedback message). Otherwise, there's
4186 : * no particular urgency about waking up unless we get data or a
4187 : * signal.
4188 : */
4189 31919 : if (!dlist_is_empty(&lsn_mapping))
4190 4058 : wait_time = WalWriterDelay;
4191 : else
4192 27861 : wait_time = NAPTIME_PER_CYCLE;
4193 :
4194 : /*
4195 : * Ensure to wake up when it's possible to advance the non-removable
4196 : * transaction ID, or when the retention duration may have exceeded
4197 : * max_retention_duration.
4198 : */
4199 31919 : if (MySubscription->retentionactive)
4200 : {
4201 3776 : if (rdt_data.phase == RDT_GET_CANDIDATE_XID &&
4202 67 : rdt_data.xid_advance_interval)
4203 0 : wait_time = Min(wait_time, rdt_data.xid_advance_interval);
4204 3776 : else if (MySubscription->maxretention > 0)
4205 1 : wait_time = Min(wait_time, MySubscription->maxretention);
4206 : }
4207 :
4208 31919 : rc = WaitLatchOrSocket(MyLatch,
4209 : WL_SOCKET_READABLE | WL_LATCH_SET |
4210 : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
4211 : fd, wait_time,
4212 : WAIT_EVENT_LOGICAL_APPLY_MAIN);
4213 :
4214 31919 : if (rc & WL_LATCH_SET)
4215 : {
4216 742 : ResetLatch(MyLatch);
4217 742 : CHECK_FOR_INTERRUPTS();
4218 : }
4219 :
4220 31819 : if (ConfigReloadPending)
4221 : {
4222 10 : ConfigReloadPending = false;
4223 10 : ProcessConfigFile(PGC_SIGHUP);
4224 : }
4225 :
4226 31819 : if (rc & WL_TIMEOUT)
4227 : {
4228 : /*
4229 : * We didn't receive anything new. If we haven't heard anything
4230 : * from the server for more than wal_receiver_timeout / 2, ping
4231 : * the server. Also, if it's been longer than
4232 : * wal_receiver_status_interval since the last update we sent,
4233 : * send a status update to the primary anyway, to report any
4234 : * progress in applying WAL.
4235 : */
4236 269 : bool requestReply = false;
4237 :
4238 : /*
4239 : * Check if time since last receive from primary has reached the
4240 : * configured limit.
4241 : */
4242 269 : if (wal_receiver_timeout > 0)
4243 : {
4244 269 : TimestampTz now = GetCurrentTimestamp();
4245 : TimestampTz timeout;
4246 :
4247 269 : timeout =
4248 269 : TimestampTzPlusMilliseconds(last_recv_timestamp,
4249 : wal_receiver_timeout);
4250 :
4251 269 : if (now >= timeout)
4252 0 : ereport(ERROR,
4253 : (errcode(ERRCODE_CONNECTION_FAILURE),
4254 : errmsg("terminating logical replication worker due to timeout")));
4255 :
4256 : /* Check to see if it's time for a ping. */
4257 269 : if (!ping_sent)
4258 : {
4259 269 : timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
4260 : (wal_receiver_timeout / 2));
4261 269 : if (now >= timeout)
4262 : {
4263 0 : requestReply = true;
4264 0 : ping_sent = true;
4265 : }
4266 : }
4267 : }
4268 :
4269 269 : send_feedback(last_received, requestReply, requestReply);
4270 :
4271 269 : maybe_advance_nonremovable_xid(&rdt_data, false);
4272 :
4273 : /*
4274 : * Force reporting to ensure long idle periods don't lead to
4275 : * arbitrarily delayed stats. Stats can only be reported outside
4276 : * of (implicit or explicit) transactions. That shouldn't lead to
4277 : * stats being delayed for long, because transactions are either
4278 : * sent as a whole on commit or streamed. Streamed transactions
4279 : * are spilled to disk and applied on commit.
4280 : */
4281 269 : if (!IsTransactionState())
4282 269 : pgstat_report_stat(true);
4283 : }
4284 : }
4285 :
4286 : /* Pop the error context stack */
4287 10 : error_context_stack = errcallback.previous;
4288 10 : apply_error_context_stack = error_context_stack;
4289 :
4290 : /* All done */
4291 10 : walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
4292 0 : }
4293 :
4294 : /*
4295 : * Send a Standby Status Update message to server.
4296 : *
4297 : * 'recvpos' is the latest LSN we've received data to, force is set if we need
4298 : * to send a response to avoid timeouts.
4299 : */
4300 : static void
4301 34248 : send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
4302 : {
4303 : static StringInfo reply_message = NULL;
4304 : static TimestampTz send_time = 0;
4305 :
4306 : static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
4307 : static XLogRecPtr last_writepos = InvalidXLogRecPtr;
4308 :
4309 : XLogRecPtr writepos;
4310 : XLogRecPtr flushpos;
4311 : TimestampTz now;
4312 : bool have_pending_txes;
4313 :
4314 : /*
4315 : * If the user doesn't want status to be reported to the publisher, be
4316 : * sure to exit before doing anything at all.
4317 : */
4318 34248 : if (!force && wal_receiver_status_interval <= 0)
4319 13424 : return;
4320 :
4321 : /* It's legal to not pass a recvpos */
4322 34248 : if (recvpos < last_recvpos)
4323 0 : recvpos = last_recvpos;
4324 :
4325 34248 : get_flush_position(&writepos, &flushpos, &have_pending_txes);
4326 :
4327 : /*
4328 : * No outstanding transactions to flush, we can report the latest received
4329 : * position. This is important for synchronous replication.
4330 : */
4331 34248 : if (!have_pending_txes)
4332 29715 : flushpos = writepos = recvpos;
4333 :
4334 34248 : if (writepos < last_writepos)
4335 0 : writepos = last_writepos;
4336 :
4337 34248 : if (flushpos < last_flushpos)
4338 4487 : flushpos = last_flushpos;
4339 :
4340 34248 : now = GetCurrentTimestamp();
4341 :
4342 : /* if we've already reported everything we're good */
4343 34248 : if (!force &&
4344 34240 : writepos == last_writepos &&
4345 13700 : flushpos == last_flushpos &&
4346 13515 : !TimestampDifferenceExceeds(send_time, now,
4347 : wal_receiver_status_interval * 1000))
4348 13424 : return;
4349 20824 : send_time = now;
4350 :
4351 20824 : if (!reply_message)
4352 : {
4353 432 : MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
4354 :
4355 432 : reply_message = makeStringInfo();
4356 432 : MemoryContextSwitchTo(oldctx);
4357 : }
4358 : else
4359 20392 : resetStringInfo(reply_message);
4360 :
4361 20824 : pq_sendbyte(reply_message, PqReplMsg_StandbyStatusUpdate);
4362 20824 : pq_sendint64(reply_message, recvpos); /* write */
4363 20824 : pq_sendint64(reply_message, flushpos); /* flush */
4364 20824 : pq_sendint64(reply_message, writepos); /* apply */
4365 20824 : pq_sendint64(reply_message, now); /* sendTime */
4366 20824 : pq_sendbyte(reply_message, requestReply); /* replyRequested */
4367 :
4368 20824 : elog(DEBUG2, "sending feedback (force %d) to recv %X/%08X, write %X/%08X, flush %X/%08X",
4369 : force,
4370 : LSN_FORMAT_ARGS(recvpos),
4371 : LSN_FORMAT_ARGS(writepos),
4372 : LSN_FORMAT_ARGS(flushpos));
4373 :
4374 20824 : walrcv_send(LogRepWorkerWalRcvConn,
4375 : reply_message->data, reply_message->len);
4376 :
4377 20824 : if (recvpos > last_recvpos)
4378 20542 : last_recvpos = recvpos;
4379 20824 : if (writepos > last_writepos)
4380 20546 : last_writepos = writepos;
4381 20824 : if (flushpos > last_flushpos)
4382 20341 : last_flushpos = flushpos;
4383 : }
4384 :
4385 : /*
4386 : * Attempt to advance the non-removable transaction ID.
4387 : *
4388 : * See comments atop worker.c for details.
4389 : */
4390 : static void
4391 230239 : maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
4392 : bool status_received)
4393 : {
4394 230239 : if (!can_advance_nonremovable_xid(rdt_data))
4395 215218 : return;
4396 :
4397 15021 : process_rdt_phase_transition(rdt_data, status_received);
4398 : }
4399 :
4400 : /*
4401 : * Preliminary check to determine if advancing the non-removable transaction ID
4402 : * is allowed.
4403 : */
4404 : static bool
4405 230239 : can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
4406 : {
4407 : /*
4408 : * It is sufficient to manage non-removable transaction ID for a
4409 : * subscription by the main apply worker to detect update_deleted reliably
4410 : * even for table sync or parallel apply workers.
4411 : */
4412 230239 : if (!am_leader_apply_worker())
4413 372 : return false;
4414 :
4415 : /* No need to advance if retaining dead tuples is not required */
4416 229867 : if (!MySubscription->retaindeadtuples)
4417 214846 : return false;
4418 :
4419 15021 : return true;
4420 : }
4421 :
4422 : /*
4423 : * Process phase transitions during the non-removable transaction ID
4424 : * advancement. See comments atop worker.c for details of the transition.
4425 : */
4426 : static void
4427 26156 : process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
4428 : bool status_received)
4429 : {
4430 26156 : switch (rdt_data->phase)
4431 : {
4432 186 : case RDT_GET_CANDIDATE_XID:
4433 186 : get_candidate_xid(rdt_data);
4434 186 : break;
4435 11053 : case RDT_REQUEST_PUBLISHER_STATUS:
4436 11053 : request_publisher_status(rdt_data);
4437 11053 : break;
4438 14797 : case RDT_WAIT_FOR_PUBLISHER_STATUS:
4439 14797 : wait_for_publisher_status(rdt_data, status_received);
4440 14797 : break;
4441 118 : case RDT_WAIT_FOR_LOCAL_FLUSH:
4442 118 : wait_for_local_flush(rdt_data);
4443 118 : break;
4444 1 : case RDT_STOP_CONFLICT_INFO_RETENTION:
4445 1 : stop_conflict_info_retention(rdt_data);
4446 1 : break;
4447 1 : case RDT_RESUME_CONFLICT_INFO_RETENTION:
4448 1 : resume_conflict_info_retention(rdt_data);
4449 0 : break;
4450 : }
4451 26155 : }
4452 :
4453 : /*
4454 : * Workhorse for the RDT_GET_CANDIDATE_XID phase.
4455 : */
4456 : static void
4457 186 : get_candidate_xid(RetainDeadTuplesData *rdt_data)
4458 : {
4459 : TransactionId oldest_running_xid;
4460 : TimestampTz now;
4461 :
4462 : /*
4463 : * Use last_recv_time when applying changes in the loop to avoid
4464 : * unnecessary system time retrieval. If last_recv_time is not available,
4465 : * obtain the current timestamp.
4466 : */
4467 186 : now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4468 :
4469 : /*
4470 : * Compute the candidate_xid and request the publisher status at most once
4471 : * per xid_advance_interval. Refer to adjust_xid_advance_interval() for
4472 : * details on how this value is dynamically adjusted. This is to avoid
4473 : * using CPU and network resources without making much progress.
4474 : */
4475 186 : if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
4476 : rdt_data->xid_advance_interval))
4477 0 : return;
4478 :
4479 : /*
4480 : * Immediately update the timer, even if the function returns later
4481 : * without setting candidate_xid due to inactivity on the subscriber. This
4482 : * avoids frequent calls to GetOldestActiveTransactionId.
4483 : */
4484 186 : rdt_data->candidate_xid_time = now;
4485 :
4486 : /*
4487 : * Consider transactions in the current database, as only dead tuples from
4488 : * this database are required for conflict detection.
4489 : */
4490 186 : oldest_running_xid = GetOldestActiveTransactionId(false, false);
4491 :
4492 : /*
4493 : * Oldest active transaction ID (oldest_running_xid) can't be behind any
4494 : * of its previously computed value.
4495 : */
4496 : Assert(TransactionIdPrecedesOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
4497 : oldest_running_xid));
4498 :
4499 : /* Return if the oldest_nonremovable_xid cannot be advanced */
4500 186 : if (TransactionIdEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
4501 : oldest_running_xid))
4502 : {
4503 138 : adjust_xid_advance_interval(rdt_data, false);
4504 138 : return;
4505 : }
4506 :
4507 48 : adjust_xid_advance_interval(rdt_data, true);
4508 :
4509 48 : rdt_data->candidate_xid = oldest_running_xid;
4510 48 : rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
4511 :
4512 : /* process the next phase */
4513 48 : process_rdt_phase_transition(rdt_data, false);
4514 : }
4515 :
4516 : /*
4517 : * Workhorse for the RDT_REQUEST_PUBLISHER_STATUS phase.
4518 : */
4519 : static void
4520 11053 : request_publisher_status(RetainDeadTuplesData *rdt_data)
4521 : {
4522 : static StringInfo request_message = NULL;
4523 :
4524 11053 : if (!request_message)
4525 : {
4526 12 : MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
4527 :
4528 12 : request_message = makeStringInfo();
4529 12 : MemoryContextSwitchTo(oldctx);
4530 : }
4531 : else
4532 11041 : resetStringInfo(request_message);
4533 :
4534 : /*
4535 : * Send the current time to update the remote walsender's latest reply
4536 : * message received time.
4537 : */
4538 11053 : pq_sendbyte(request_message, PqReplMsg_PrimaryStatusRequest);
4539 11053 : pq_sendint64(request_message, GetCurrentTimestamp());
4540 :
4541 11053 : elog(DEBUG2, "sending publisher status request message");
4542 :
4543 : /* Send a request for the publisher status */
4544 11053 : walrcv_send(LogRepWorkerWalRcvConn,
4545 : request_message->data, request_message->len);
4546 :
4547 11053 : rdt_data->phase = RDT_WAIT_FOR_PUBLISHER_STATUS;
4548 :
4549 : /*
4550 : * Skip calling maybe_advance_nonremovable_xid() since further transition
4551 : * is possible only once we receive the publisher status message.
4552 : */
4553 11053 : }
4554 :
4555 : /*
4556 : * Workhorse for the RDT_WAIT_FOR_PUBLISHER_STATUS phase.
4557 : */
4558 : static void
4559 14797 : wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
4560 : bool status_received)
4561 : {
4562 : /*
4563 : * Return if we have requested but not yet received the publisher status.
4564 : */
4565 14797 : if (!status_received)
4566 3750 : return;
4567 :
4568 : /*
4569 : * We don't need to maintain oldest_nonremovable_xid if we decide to stop
4570 : * retaining conflict information for this worker.
4571 : */
4572 11047 : if (should_stop_conflict_info_retention(rdt_data))
4573 : {
4574 0 : rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
4575 0 : return;
4576 : }
4577 :
4578 11047 : if (!FullTransactionIdIsValid(rdt_data->remote_wait_for))
4579 42 : rdt_data->remote_wait_for = rdt_data->remote_nextxid;
4580 :
4581 : /*
4582 : * Check if all remote concurrent transactions that were active at the
4583 : * first status request have now completed. If completed, proceed to the
4584 : * next phase; otherwise, continue checking the publisher status until
4585 : * these transactions finish.
4586 : *
4587 : * It's possible that transactions in the commit phase during the last
4588 : * cycle have now finished committing, but remote_oldestxid remains older
4589 : * than remote_wait_for. This can happen if some old transaction came in
4590 : * the commit phase when we requested status in this cycle. We do not
4591 : * handle this case explicitly as it's rare and the benefit doesn't
4592 : * justify the required complexity. Tracking would require either caching
4593 : * all xids at the publisher or sending them to subscribers. The condition
4594 : * will resolve naturally once the remaining transactions are finished.
4595 : *
4596 : * Directly advancing the non-removable transaction ID is possible if
4597 : * there are no activities on the publisher since the last advancement
4598 : * cycle. However, it requires maintaining two fields, last_remote_nextxid
4599 : * and last_remote_lsn, within the structure for comparison with the
4600 : * current cycle's values. Considering the minimal cost of continuing in
4601 : * RDT_WAIT_FOR_LOCAL_FLUSH without awaiting changes, we opted not to
4602 : * advance the transaction ID here.
4603 : */
4604 11047 : if (FullTransactionIdPrecedesOrEquals(rdt_data->remote_wait_for,
4605 : rdt_data->remote_oldestxid))
4606 42 : rdt_data->phase = RDT_WAIT_FOR_LOCAL_FLUSH;
4607 : else
4608 11005 : rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
4609 :
4610 : /* process the next phase */
4611 11047 : process_rdt_phase_transition(rdt_data, false);
4612 : }
4613 :
4614 : /*
4615 : * Workhorse for the RDT_WAIT_FOR_LOCAL_FLUSH phase.
4616 : */
4617 : static void
4618 118 : wait_for_local_flush(RetainDeadTuplesData *rdt_data)
4619 : {
4620 : Assert(XLogRecPtrIsValid(rdt_data->remote_lsn) &&
4621 : TransactionIdIsValid(rdt_data->candidate_xid));
4622 :
4623 : /*
4624 : * We expect the publisher and subscriber clocks to be in sync using time
4625 : * sync service like NTP. Otherwise, we will advance this worker's
4626 : * oldest_nonremovable_xid prematurely, leading to the removal of rows
4627 : * required to detect update_deleted reliably. This check primarily
4628 : * addresses scenarios where the publisher's clock falls behind; if the
4629 : * publisher's clock is ahead, subsequent transactions will naturally bear
4630 : * later commit timestamps, conforming to the design outlined atop
4631 : * worker.c.
4632 : *
4633 : * XXX Consider waiting for the publisher's clock to catch up with the
4634 : * subscriber's before proceeding to the next phase.
4635 : */
4636 118 : if (TimestampDifferenceExceeds(rdt_data->reply_time,
4637 : rdt_data->candidate_xid_time, 0))
4638 0 : ereport(ERROR,
4639 : errmsg_internal("oldest_nonremovable_xid transaction ID could be advanced prematurely"),
4640 : errdetail_internal("The clock on the publisher is behind that of the subscriber."));
4641 :
4642 : /*
4643 : * Do not attempt to advance the non-removable transaction ID when table
4644 : * sync is in progress. During this time, changes from a single
4645 : * transaction may be applied by multiple table sync workers corresponding
4646 : * to the target tables. So, it's necessary for all table sync workers to
4647 : * apply and flush the corresponding changes before advancing the
4648 : * transaction ID, otherwise, dead tuples that are still needed for
4649 : * conflict detection in table sync workers could be removed prematurely.
4650 : * However, confirming the apply and flush progress across all table sync
4651 : * workers is complex and not worth the effort, so we simply return if not
4652 : * all tables are in the READY state.
4653 : *
4654 : * Advancing the transaction ID is necessary even when no tables are
4655 : * currently subscribed, to avoid retaining dead tuples unnecessarily.
4656 : * While it might seem safe to skip all phases and directly assign
4657 : * candidate_xid to oldest_nonremovable_xid during the
4658 : * RDT_GET_CANDIDATE_XID phase in such cases, this is unsafe. If users
4659 : * concurrently add tables to the subscription, the apply worker may not
4660 : * process invalidations in time. Consequently,
4661 : * HasSubscriptionTablesCached() might miss the new tables, leading to
4662 : * premature advancement of oldest_nonremovable_xid.
4663 : *
4664 : * Performing the check during RDT_WAIT_FOR_LOCAL_FLUSH is safe, as
4665 : * invalidations are guaranteed to be processed before applying changes
4666 : * from newly added tables while waiting for the local flush to reach
4667 : * remote_lsn.
4668 : *
4669 : * Additionally, even if we check for subscription tables during
4670 : * RDT_GET_CANDIDATE_XID, they might be dropped before reaching
4671 : * RDT_WAIT_FOR_LOCAL_FLUSH. Therefore, it's still necessary to verify
4672 : * subscription tables at this stage to prevent unnecessary tuple
4673 : * retention.
4674 : */
4675 118 : if (HasSubscriptionTablesCached() && !AllTablesyncsReady())
4676 : {
4677 : TimestampTz now;
4678 :
4679 28 : now = rdt_data->last_recv_time
4680 14 : ? rdt_data->last_recv_time : GetCurrentTimestamp();
4681 :
4682 : /*
4683 : * Record the time spent waiting for table sync, it is needed for the
4684 : * timeout check in should_stop_conflict_info_retention().
4685 : */
4686 14 : rdt_data->table_sync_wait_time =
4687 14 : TimestampDifferenceMilliseconds(rdt_data->candidate_xid_time, now);
4688 :
4689 14 : return;
4690 : }
4691 :
4692 : /*
4693 : * We don't need to maintain oldest_nonremovable_xid if we decide to stop
4694 : * retaining conflict information for this worker.
4695 : */
4696 104 : if (should_stop_conflict_info_retention(rdt_data))
4697 : {
4698 1 : rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
4699 1 : return;
4700 : }
4701 :
4702 : /*
4703 : * Update and check the remote flush position if we are applying changes
4704 : * in a loop. This is done at most once per WalWriterDelay to avoid
4705 : * performing costly operations in get_flush_position() too frequently
4706 : * during change application.
4707 : */
4708 142 : if (last_flushpos < rdt_data->remote_lsn && rdt_data->last_recv_time &&
4709 39 : TimestampDifferenceExceeds(rdt_data->flushpos_update_time,
4710 : rdt_data->last_recv_time, WalWriterDelay))
4711 : {
4712 : XLogRecPtr writepos;
4713 : XLogRecPtr flushpos;
4714 : bool have_pending_txes;
4715 :
4716 : /* Fetch the latest remote flush position */
4717 13 : get_flush_position(&writepos, &flushpos, &have_pending_txes);
4718 :
4719 13 : if (flushpos > last_flushpos)
4720 0 : last_flushpos = flushpos;
4721 :
4722 13 : rdt_data->flushpos_update_time = rdt_data->last_recv_time;
4723 : }
4724 :
4725 : /* Return to wait for the changes to be applied */
4726 103 : if (last_flushpos < rdt_data->remote_lsn)
4727 62 : return;
4728 :
4729 : /*
4730 : * Reaching this point implies should_stop_conflict_info_retention()
4731 : * returned false earlier, meaning that the most recent duration for
4732 : * advancing the non-removable transaction ID is within the
4733 : * max_retention_duration or max_retention_duration is set to 0.
4734 : *
4735 : * Therefore, if conflict info retention was previously stopped due to a
4736 : * timeout, it is now safe to resume retention.
4737 : */
4738 41 : if (!MySubscription->retentionactive)
4739 : {
4740 1 : rdt_data->phase = RDT_RESUME_CONFLICT_INFO_RETENTION;
4741 1 : return;
4742 : }
4743 :
4744 : /*
4745 : * Reaching here means the remote WAL position has been received, and all
4746 : * transactions up to that position on the publisher have been applied and
4747 : * flushed locally. So, we can advance the non-removable transaction ID.
4748 : */
4749 40 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
4750 40 : MyLogicalRepWorker->oldest_nonremovable_xid = rdt_data->candidate_xid;
4751 40 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
4752 :
4753 40 : elog(DEBUG2, "confirmed flush up to remote lsn %X/%08X: new oldest_nonremovable_xid %u",
4754 : LSN_FORMAT_ARGS(rdt_data->remote_lsn),
4755 : rdt_data->candidate_xid);
4756 :
4757 : /* Notify launcher to update the xmin of the conflict slot */
4758 40 : ApplyLauncherWakeup();
4759 :
4760 40 : reset_retention_data_fields(rdt_data);
4761 :
4762 : /* process the next phase */
4763 40 : process_rdt_phase_transition(rdt_data, false);
4764 : }
4765 :
4766 : /*
4767 : * Check whether conflict information retention should be stopped due to
4768 : * exceeding the maximum wait time (max_retention_duration).
4769 : *
4770 : * If retention should be stopped, return true. Otherwise, return false.
4771 : */
4772 : static bool
4773 11151 : should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
4774 : {
4775 : TimestampTz now;
4776 :
4777 : Assert(TransactionIdIsValid(rdt_data->candidate_xid));
4778 : Assert(rdt_data->phase == RDT_WAIT_FOR_PUBLISHER_STATUS ||
4779 : rdt_data->phase == RDT_WAIT_FOR_LOCAL_FLUSH);
4780 :
4781 11151 : if (!MySubscription->maxretention)
4782 11150 : return false;
4783 :
4784 : /*
4785 : * Use last_recv_time when applying changes in the loop to avoid
4786 : * unnecessary system time retrieval. If last_recv_time is not available,
4787 : * obtain the current timestamp.
4788 : */
4789 1 : now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4790 :
4791 : /*
4792 : * Return early if the wait time has not exceeded the configured maximum
4793 : * (max_retention_duration). Time spent waiting for table synchronization
4794 : * is excluded from this calculation, as it occurs infrequently.
4795 : */
4796 1 : if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
4797 1 : MySubscription->maxretention +
4798 1 : rdt_data->table_sync_wait_time))
4799 0 : return false;
4800 :
4801 1 : return true;
4802 : }
4803 :
4804 : /*
4805 : * Workhorse for the RDT_STOP_CONFLICT_INFO_RETENTION phase.
4806 : */
4807 : static void
4808 1 : stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
4809 : {
4810 : /* Stop retention if not yet */
4811 1 : if (MySubscription->retentionactive)
4812 : {
4813 : /*
4814 : * If the retention status cannot be updated (e.g., due to active
4815 : * transaction), skip further processing to avoid inconsistent
4816 : * retention behavior.
4817 : */
4818 1 : if (!update_retention_status(false))
4819 0 : return;
4820 :
4821 1 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
4822 1 : MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId;
4823 1 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
4824 :
4825 1 : ereport(LOG,
4826 : errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts",
4827 : MySubscription->name),
4828 : errdetail("Retention is stopped because the apply process has not caught up with the publisher within the configured max_retention_duration."));
4829 : }
4830 :
4831 : Assert(!TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid));
4832 :
4833 : /*
4834 : * If retention has been stopped, reset to the initial phase to retry
4835 : * resuming retention. This reset is required to recalculate the current
4836 : * wait time and resume retention if the time falls within
4837 : * max_retention_duration.
4838 : */
4839 1 : reset_retention_data_fields(rdt_data);
4840 : }
4841 :
4842 : /*
4843 : * Workhorse for the RDT_RESUME_CONFLICT_INFO_RETENTION phase.
4844 : */
4845 : static void
4846 1 : resume_conflict_info_retention(RetainDeadTuplesData *rdt_data)
4847 : {
4848 : /* We can't resume retention without updating retention status. */
4849 1 : if (!update_retention_status(true))
4850 0 : return;
4851 :
4852 1 : ereport(LOG,
4853 : errmsg("logical replication worker for subscription \"%s\" will resume retaining the information for detecting conflicts",
4854 : MySubscription->name),
4855 : MySubscription->maxretention
4856 : ? errdetail("Retention is re-enabled because the apply process has caught up with the publisher within the configured max_retention_duration.")
4857 : : errdetail("Retention is re-enabled because max_retention_duration has been set to unlimited."));
4858 :
4859 : /*
4860 : * Restart the worker to let the launcher initialize
4861 : * oldest_nonremovable_xid at startup.
4862 : *
4863 : * While it's technically possible to derive this value on-the-fly using
4864 : * the conflict detection slot's xmin, doing so risks a race condition:
4865 : * the launcher might clean slot.xmin just after retention resumes. This
4866 : * would make oldest_nonremovable_xid unreliable, especially during xid
4867 : * wraparound.
4868 : *
4869 : * Although this can be prevented by introducing heavy weight locking, the
4870 : * complexity it will bring doesn't seem worthwhile given how rarely
4871 : * retention is resumed.
4872 : */
4873 1 : apply_worker_exit();
4874 : }
4875 :
4876 : /*
4877 : * Updates pg_subscription.subretentionactive to the given value within a
4878 : * new transaction.
4879 : *
4880 : * If already inside an active transaction, skips the update and returns
4881 : * false.
4882 : *
4883 : * Returns true if the update is successfully performed.
4884 : */
4885 : static bool
4886 2 : update_retention_status(bool active)
4887 : {
4888 : /*
4889 : * Do not update the catalog during an active transaction. The transaction
4890 : * may be started during change application, leading to a possible
4891 : * rollback of catalog updates if the application fails subsequently.
4892 : */
4893 2 : if (IsTransactionState())
4894 0 : return false;
4895 :
4896 2 : StartTransactionCommand();
4897 :
4898 : /*
4899 : * Updating pg_subscription might involve TOAST table access, so ensure we
4900 : * have a valid snapshot.
4901 : */
4902 2 : PushActiveSnapshot(GetTransactionSnapshot());
4903 :
4904 : /* Update pg_subscription.subretentionactive */
4905 2 : UpdateDeadTupleRetentionStatus(MySubscription->oid, active);
4906 :
4907 2 : PopActiveSnapshot();
4908 2 : CommitTransactionCommand();
4909 :
4910 : /* Notify launcher to update the conflict slot */
4911 2 : ApplyLauncherWakeup();
4912 :
4913 2 : MySubscription->retentionactive = active;
4914 :
4915 2 : return true;
4916 : }
4917 :
4918 : /*
4919 : * Reset all data fields of RetainDeadTuplesData except those used to
4920 : * determine the timing for the next round of transaction ID advancement. We
4921 : * can even use flushpos_update_time in the next round to decide whether to get
4922 : * the latest flush position.
4923 : */
4924 : static void
4925 41 : reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
4926 : {
4927 41 : rdt_data->phase = RDT_GET_CANDIDATE_XID;
4928 41 : rdt_data->remote_lsn = InvalidXLogRecPtr;
4929 41 : rdt_data->remote_oldestxid = InvalidFullTransactionId;
4930 41 : rdt_data->remote_nextxid = InvalidFullTransactionId;
4931 41 : rdt_data->reply_time = 0;
4932 41 : rdt_data->remote_wait_for = InvalidFullTransactionId;
4933 41 : rdt_data->candidate_xid = InvalidTransactionId;
4934 41 : rdt_data->table_sync_wait_time = 0;
4935 41 : }
4936 :
4937 : /*
4938 : * Adjust the interval for advancing non-removable transaction IDs.
4939 : *
4940 : * If there is no activity on the node or retention has been stopped, we
4941 : * progressively double the interval used to advance non-removable transaction
4942 : * ID. This helps conserve CPU and network resources when there's little benefit
4943 : * to frequent updates.
4944 : *
4945 : * The interval is capped by the lowest of the following:
4946 : * - wal_receiver_status_interval (if set and retention is active),
4947 : * - a default maximum of 3 minutes,
4948 : * - max_retention_duration (if retention is active).
4949 : *
4950 : * This ensures the interval never exceeds the retention boundary, even if other
4951 : * limits are higher. Once activity resumes on the node and the retention is
4952 : * active, the interval is reset to lesser of 100ms and max_retention_duration,
4953 : * allowing timely advancement of non-removable transaction ID.
4954 : *
4955 : * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
4956 : * consider the other interval or a separate GUC if the need arises.
4957 : */
4958 : static void
4959 186 : adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
4960 : {
4961 186 : if (rdt_data->xid_advance_interval && !new_xid_found)
4962 0 : {
4963 0 : int max_interval = wal_receiver_status_interval
4964 0 : ? wal_receiver_status_interval * 1000
4965 0 : : MAX_XID_ADVANCE_INTERVAL;
4966 :
4967 : /*
4968 : * No new transaction ID has been assigned since the last check, so
4969 : * double the interval, but not beyond the maximum allowable value.
4970 : */
4971 0 : rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
4972 : max_interval);
4973 : }
4974 186 : else if (rdt_data->xid_advance_interval &&
4975 0 : !MySubscription->retentionactive)
4976 : {
4977 : /*
4978 : * Retention has been stopped, so double the interval-capped at a
4979 : * maximum of 3 minutes. The wal_receiver_status_interval is
4980 : * intentionally not used as a upper bound, since the likelihood of
4981 : * retention resuming is lower than that of general activity resuming.
4982 : */
4983 0 : rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
4984 : MAX_XID_ADVANCE_INTERVAL);
4985 : }
4986 : else
4987 : {
4988 : /*
4989 : * A new transaction ID was found or the interval is not yet
4990 : * initialized, so set the interval to the minimum value.
4991 : */
4992 186 : rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL;
4993 : }
4994 :
4995 : /*
4996 : * Ensure the wait time remains within the maximum retention time limit
4997 : * when retention is active.
4998 : */
4999 186 : if (MySubscription->retentionactive)
5000 185 : rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval,
5001 : MySubscription->maxretention);
5002 186 : }
5003 :
5004 : /*
5005 : * Exit routine for apply workers due to subscription parameter changes.
5006 : */
5007 : static void
5008 45 : apply_worker_exit(void)
5009 : {
5010 45 : if (am_parallel_apply_worker())
5011 : {
5012 : /*
5013 : * Don't stop the parallel apply worker as the leader will detect the
5014 : * subscription parameter change and restart logical replication later
5015 : * anyway. This also prevents the leader from reporting errors when
5016 : * trying to communicate with a stopped parallel apply worker, which
5017 : * would accidentally disable subscriptions if disable_on_error was
5018 : * set.
5019 : */
5020 0 : return;
5021 : }
5022 :
5023 : /*
5024 : * Reset the last-start time for this apply worker so that the launcher
5025 : * will restart it without waiting for wal_retrieve_retry_interval if the
5026 : * subscription is still active, and so that we won't leak that hash table
5027 : * entry if it isn't.
5028 : */
5029 45 : if (am_leader_apply_worker())
5030 45 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
5031 :
5032 45 : proc_exit(0);
5033 : }
5034 :
5035 : /*
5036 : * Reread subscription info if needed.
5037 : *
5038 : * For significant changes, we react by exiting the current process; a new
5039 : * one will be launched afterwards if needed.
5040 : */
5041 : void
5042 7504 : maybe_reread_subscription(void)
5043 : {
5044 : MemoryContext oldctx;
5045 : Subscription *newsub;
5046 7504 : bool started_tx = false;
5047 :
5048 : /* When cache state is valid there is nothing to do here. */
5049 7504 : if (MySubscriptionValid)
5050 7415 : return;
5051 :
5052 : /* This function might be called inside or outside of transaction. */
5053 89 : if (!IsTransactionState())
5054 : {
5055 84 : StartTransactionCommand();
5056 84 : started_tx = true;
5057 : }
5058 :
5059 : /* Ensure allocations in permanent context. */
5060 89 : oldctx = MemoryContextSwitchTo(ApplyContext);
5061 :
5062 89 : newsub = GetSubscription(MyLogicalRepWorker->subid, true, true);
5063 :
5064 : /*
5065 : * Exit if the subscription was removed. This normally should not happen
5066 : * as the worker gets killed during DROP SUBSCRIPTION.
5067 : */
5068 89 : if (!newsub)
5069 : {
5070 0 : ereport(LOG,
5071 : (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
5072 : MySubscription->name)));
5073 :
5074 : /* Ensure we remove no-longer-useful entry for worker's start time */
5075 0 : if (am_leader_apply_worker())
5076 0 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
5077 :
5078 0 : proc_exit(0);
5079 : }
5080 :
5081 : /* Exit if the subscription was disabled. */
5082 89 : if (!newsub->enabled)
5083 : {
5084 14 : ereport(LOG,
5085 : (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
5086 : MySubscription->name)));
5087 :
5088 14 : apply_worker_exit();
5089 : }
5090 :
5091 : /* !slotname should never happen when enabled is true. */
5092 : Assert(newsub->slotname);
5093 :
5094 : /* two-phase cannot be altered while the worker is running */
5095 : Assert(newsub->twophasestate == MySubscription->twophasestate);
5096 :
5097 : /*
5098 : * Exit if any parameter that affects the remote connection was changed.
5099 : * The launcher will start a new worker but note that the parallel apply
5100 : * worker won't restart if the streaming option's value is changed from
5101 : * 'parallel' to any other value or the server decides not to stream the
5102 : * in-progress transaction.
5103 : */
5104 75 : if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
5105 73 : strcmp(newsub->name, MySubscription->name) != 0 ||
5106 72 : strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
5107 72 : newsub->binary != MySubscription->binary ||
5108 66 : newsub->stream != MySubscription->stream ||
5109 61 : newsub->passwordrequired != MySubscription->passwordrequired ||
5110 61 : strcmp(newsub->origin, MySubscription->origin) != 0 ||
5111 59 : newsub->owner != MySubscription->owner ||
5112 58 : !equal(newsub->publications, MySubscription->publications))
5113 : {
5114 26 : if (am_parallel_apply_worker())
5115 0 : ereport(LOG,
5116 : (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
5117 : MySubscription->name)));
5118 : else
5119 26 : ereport(LOG,
5120 : (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
5121 : MySubscription->name)));
5122 :
5123 26 : apply_worker_exit();
5124 : }
5125 :
5126 : /*
5127 : * Exit if the subscription owner's superuser privileges have been
5128 : * revoked.
5129 : */
5130 49 : if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
5131 : {
5132 4 : if (am_parallel_apply_worker())
5133 0 : ereport(LOG,
5134 : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
5135 : MySubscription->name));
5136 : else
5137 4 : ereport(LOG,
5138 : errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
5139 : MySubscription->name));
5140 :
5141 4 : apply_worker_exit();
5142 : }
5143 :
5144 : /* Check for other changes that should never happen too. */
5145 45 : if (newsub->dbid != MySubscription->dbid)
5146 : {
5147 0 : elog(ERROR, "subscription %u changed unexpectedly",
5148 : MyLogicalRepWorker->subid);
5149 : }
5150 :
5151 : /* Clean old subscription info and switch to new one. */
5152 45 : FreeSubscription(MySubscription);
5153 45 : MySubscription = newsub;
5154 :
5155 45 : MemoryContextSwitchTo(oldctx);
5156 :
5157 : /* Change synchronous commit according to the user's wishes */
5158 45 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
5159 : PGC_BACKEND, PGC_S_OVERRIDE);
5160 :
5161 : /* Change wal_receiver_timeout according to the user's wishes */
5162 45 : set_wal_receiver_timeout();
5163 :
5164 45 : if (started_tx)
5165 43 : CommitTransactionCommand();
5166 :
5167 45 : MySubscriptionValid = true;
5168 : }
5169 :
5170 : /*
5171 : * Change wal_receiver_timeout to MySubscription->walrcvtimeout.
5172 : */
5173 : static void
5174 555 : set_wal_receiver_timeout(void)
5175 : {
5176 : bool parsed;
5177 : int val;
5178 555 : int prev_timeout = wal_receiver_timeout;
5179 :
5180 : /*
5181 : * Set the wal_receiver_timeout GUC to MySubscription->walrcvtimeout,
5182 : * which comes from the subscription's wal_receiver_timeout option. If the
5183 : * value is -1, reset the GUC to its default, meaning it will inherit from
5184 : * the server config, command line, or role/database settings.
5185 : */
5186 555 : parsed = parse_int(MySubscription->walrcvtimeout, &val, 0, NULL);
5187 555 : if (parsed && val == -1)
5188 555 : SetConfigOption("wal_receiver_timeout", NULL,
5189 : PGC_BACKEND, PGC_S_SESSION);
5190 : else
5191 0 : SetConfigOption("wal_receiver_timeout", MySubscription->walrcvtimeout,
5192 : PGC_BACKEND, PGC_S_SESSION);
5193 :
5194 : /*
5195 : * Log the wal_receiver_timeout setting (in milliseconds) as a debug
5196 : * message when it changes, to verify it was set correctly.
5197 : */
5198 555 : if (prev_timeout != wal_receiver_timeout)
5199 0 : elog(DEBUG1, "logical replication worker for subscription \"%s\" wal_receiver_timeout: %d ms",
5200 : MySubscription->name, wal_receiver_timeout);
5201 555 : }
5202 :
5203 : /*
5204 : * Callback from subscription syscache invalidation. Also needed for server or
5205 : * user mapping invalidation, which can change the connection information for
5206 : * subscriptions that connect using a server object.
5207 : */
5208 : static void
5209 92 : subscription_change_cb(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
5210 : {
5211 92 : MySubscriptionValid = false;
5212 92 : }
5213 :
5214 : /*
5215 : * subxact_info_write
5216 : * Store information about subxacts for a toplevel transaction.
5217 : *
5218 : * For each subxact we store offset of its first change in the main file.
5219 : * The file is always over-written as a whole.
5220 : *
5221 : * XXX We should only store subxacts that were not aborted yet.
5222 : */
5223 : static void
5224 372 : subxact_info_write(Oid subid, TransactionId xid)
5225 : {
5226 : char path[MAXPGPATH];
5227 : Size len;
5228 : BufFile *fd;
5229 :
5230 : Assert(TransactionIdIsValid(xid));
5231 :
5232 : /* construct the subxact filename */
5233 372 : subxact_filename(path, subid, xid);
5234 :
5235 : /* Delete the subxacts file, if exists. */
5236 372 : if (subxact_data.nsubxacts == 0)
5237 : {
5238 290 : cleanup_subxact_info();
5239 290 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
5240 :
5241 290 : return;
5242 : }
5243 :
5244 : /*
5245 : * Create the subxact file if it not already created, otherwise open the
5246 : * existing file.
5247 : */
5248 82 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR,
5249 : true);
5250 82 : if (fd == NULL)
5251 8 : fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path);
5252 :
5253 82 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
5254 :
5255 : /* Write the subxact count and subxact info */
5256 82 : BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
5257 82 : BufFileWrite(fd, subxact_data.subxacts, len);
5258 :
5259 82 : BufFileClose(fd);
5260 :
5261 : /* free the memory allocated for subxact info */
5262 82 : cleanup_subxact_info();
5263 : }
5264 :
5265 : /*
5266 : * subxact_info_read
5267 : * Restore information about subxacts of a streamed transaction.
5268 : *
5269 : * Read information about subxacts into the structure subxact_data that can be
5270 : * used later.
5271 : */
5272 : static void
5273 344 : subxact_info_read(Oid subid, TransactionId xid)
5274 : {
5275 : char path[MAXPGPATH];
5276 : Size len;
5277 : BufFile *fd;
5278 : MemoryContext oldctx;
5279 :
5280 : Assert(!subxact_data.subxacts);
5281 : Assert(subxact_data.nsubxacts == 0);
5282 : Assert(subxact_data.nsubxacts_max == 0);
5283 :
5284 : /*
5285 : * If the subxact file doesn't exist that means we don't have any subxact
5286 : * info.
5287 : */
5288 344 : subxact_filename(path, subid, xid);
5289 344 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
5290 : true);
5291 344 : if (fd == NULL)
5292 265 : return;
5293 :
5294 : /* read number of subxact items */
5295 79 : BufFileReadExact(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
5296 :
5297 79 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
5298 :
5299 : /* we keep the maximum as a power of 2 */
5300 79 : subxact_data.nsubxacts_max = 1 << pg_ceil_log2_32(subxact_data.nsubxacts);
5301 :
5302 : /*
5303 : * Allocate subxact information in the logical streaming context. We need
5304 : * this information during the complete stream so that we can add the sub
5305 : * transaction info to this. On stream stop we will flush this information
5306 : * to the subxact file and reset the logical streaming context.
5307 : */
5308 79 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
5309 79 : subxact_data.subxacts = palloc_array(SubXactInfo,
5310 : subxact_data.nsubxacts_max);
5311 79 : MemoryContextSwitchTo(oldctx);
5312 :
5313 79 : if (len > 0)
5314 79 : BufFileReadExact(fd, subxact_data.subxacts, len);
5315 :
5316 79 : BufFileClose(fd);
5317 : }
5318 :
5319 : /*
5320 : * subxact_info_add
5321 : * Add information about a subxact (offset in the main file).
5322 : */
5323 : static void
5324 102513 : subxact_info_add(TransactionId xid)
5325 : {
5326 102513 : SubXactInfo *subxacts = subxact_data.subxacts;
5327 : int64 i;
5328 :
5329 : /* We must have a valid top level stream xid and a stream fd. */
5330 : Assert(TransactionIdIsValid(stream_xid));
5331 : Assert(stream_fd != NULL);
5332 :
5333 : /*
5334 : * If the XID matches the toplevel transaction, we don't want to add it.
5335 : */
5336 102513 : if (stream_xid == xid)
5337 92389 : return;
5338 :
5339 : /*
5340 : * In most cases we're checking the same subxact as we've already seen in
5341 : * the last call, so make sure to ignore it (this change comes later).
5342 : */
5343 10124 : if (subxact_data.subxact_last == xid)
5344 10048 : return;
5345 :
5346 : /* OK, remember we're processing this XID. */
5347 76 : subxact_data.subxact_last = xid;
5348 :
5349 : /*
5350 : * Check if the transaction is already present in the array of subxact. We
5351 : * intentionally scan the array from the tail, because we're likely adding
5352 : * a change for the most recent subtransactions.
5353 : *
5354 : * XXX Can we rely on the subxact XIDs arriving in sorted order? That
5355 : * would allow us to use binary search here.
5356 : */
5357 95 : for (i = subxact_data.nsubxacts; i > 0; i--)
5358 : {
5359 : /* found, so we're done */
5360 76 : if (subxacts[i - 1].xid == xid)
5361 57 : return;
5362 : }
5363 :
5364 : /* This is a new subxact, so we need to add it to the array. */
5365 19 : if (subxact_data.nsubxacts == 0)
5366 : {
5367 : MemoryContext oldctx;
5368 :
5369 8 : subxact_data.nsubxacts_max = 128;
5370 :
5371 : /*
5372 : * Allocate this memory for subxacts in per-stream context, see
5373 : * subxact_info_read.
5374 : */
5375 8 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
5376 8 : subxacts = palloc_array(SubXactInfo, subxact_data.nsubxacts_max);
5377 8 : MemoryContextSwitchTo(oldctx);
5378 : }
5379 11 : else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
5380 : {
5381 10 : subxact_data.nsubxacts_max *= 2;
5382 10 : subxacts = repalloc_array(subxacts, SubXactInfo,
5383 : subxact_data.nsubxacts_max);
5384 : }
5385 :
5386 19 : subxacts[subxact_data.nsubxacts].xid = xid;
5387 :
5388 : /*
5389 : * Get the current offset of the stream file and store it as offset of
5390 : * this subxact.
5391 : */
5392 19 : BufFileTell(stream_fd,
5393 19 : &subxacts[subxact_data.nsubxacts].fileno,
5394 19 : &subxacts[subxact_data.nsubxacts].offset);
5395 :
5396 19 : subxact_data.nsubxacts++;
5397 19 : subxact_data.subxacts = subxacts;
5398 : }
5399 :
5400 : /* format filename for file containing the info about subxacts */
5401 : static inline void
5402 747 : subxact_filename(char *path, Oid subid, TransactionId xid)
5403 : {
5404 747 : snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
5405 747 : }
5406 :
5407 : /* format filename for file containing serialized changes */
5408 : static inline void
5409 438 : changes_filename(char *path, Oid subid, TransactionId xid)
5410 : {
5411 438 : snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
5412 438 : }
5413 :
5414 : /*
5415 : * stream_cleanup_files
5416 : * Cleanup files for a subscription / toplevel transaction.
5417 : *
5418 : * Remove files with serialized changes and subxact info for a particular
5419 : * toplevel transaction. Each subscription has a separate set of files
5420 : * for any toplevel transaction.
5421 : */
5422 : void
5423 31 : stream_cleanup_files(Oid subid, TransactionId xid)
5424 : {
5425 : char path[MAXPGPATH];
5426 :
5427 : /* Delete the changes file. */
5428 31 : changes_filename(path, subid, xid);
5429 31 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false);
5430 :
5431 : /* Delete the subxact file, if it exists. */
5432 31 : subxact_filename(path, subid, xid);
5433 31 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
5434 31 : }
5435 :
5436 : /*
5437 : * stream_open_file
5438 : * Open a file that we'll use to serialize changes for a toplevel
5439 : * transaction.
5440 : *
5441 : * Open a file for streamed changes from a toplevel transaction identified
5442 : * by stream_xid (global variable). If it's the first chunk of streamed
5443 : * changes for this transaction, create the buffile, otherwise open the
5444 : * previously created file.
5445 : */
5446 : static void
5447 363 : stream_open_file(Oid subid, TransactionId xid, bool first_segment)
5448 : {
5449 : char path[MAXPGPATH];
5450 : MemoryContext oldcxt;
5451 :
5452 : Assert(OidIsValid(subid));
5453 : Assert(TransactionIdIsValid(xid));
5454 : Assert(stream_fd == NULL);
5455 :
5456 :
5457 363 : changes_filename(path, subid, xid);
5458 363 : elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
5459 :
5460 : /*
5461 : * Create/open the buffiles under the logical streaming context so that we
5462 : * have those files until stream stop.
5463 : */
5464 363 : oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
5465 :
5466 : /*
5467 : * If this is the first streamed segment, create the changes file.
5468 : * Otherwise, just open the file for writing, in append mode.
5469 : */
5470 363 : if (first_segment)
5471 32 : stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset,
5472 : path);
5473 : else
5474 : {
5475 : /*
5476 : * Open the file and seek to the end of the file because we always
5477 : * append the changes file.
5478 : */
5479 331 : stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset,
5480 : path, O_RDWR, false);
5481 331 : BufFileSeek(stream_fd, 0, 0, SEEK_END);
5482 : }
5483 :
5484 363 : MemoryContextSwitchTo(oldcxt);
5485 363 : }
5486 :
5487 : /*
5488 : * stream_close_file
5489 : * Close the currently open file with streamed changes.
5490 : */
5491 : static void
5492 393 : stream_close_file(void)
5493 : {
5494 : Assert(stream_fd != NULL);
5495 :
5496 393 : BufFileClose(stream_fd);
5497 :
5498 393 : stream_fd = NULL;
5499 393 : }
5500 :
5501 : /*
5502 : * stream_write_change
5503 : * Serialize a change to a file for the current toplevel transaction.
5504 : *
5505 : * The change is serialized in a simple format, with length (not including
5506 : * the length), action code (identifying the message type) and message
5507 : * contents (without the subxact TransactionId value).
5508 : */
5509 : static void
5510 107554 : stream_write_change(char action, StringInfo s)
5511 : {
5512 : int len;
5513 :
5514 : Assert(stream_fd != NULL);
5515 :
5516 : /* total on-disk size, including the action type character */
5517 107554 : len = (s->len - s->cursor) + sizeof(char);
5518 :
5519 : /* first write the size */
5520 107554 : BufFileWrite(stream_fd, &len, sizeof(len));
5521 :
5522 : /* then the action */
5523 107554 : BufFileWrite(stream_fd, &action, sizeof(action));
5524 :
5525 : /* and finally the remaining part of the buffer (after the XID) */
5526 107554 : len = (s->len - s->cursor);
5527 :
5528 107554 : BufFileWrite(stream_fd, &s->data[s->cursor], len);
5529 107554 : }
5530 :
5531 : /*
5532 : * stream_open_and_write_change
5533 : * Serialize a message to a file for the given transaction.
5534 : *
5535 : * This function is similar to stream_write_change except that it will open the
5536 : * target file if not already before writing the message and close the file at
5537 : * the end.
5538 : */
5539 : static void
5540 5 : stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
5541 : {
5542 : Assert(!in_streamed_transaction);
5543 :
5544 5 : if (!stream_fd)
5545 5 : stream_start_internal(xid, false);
5546 :
5547 5 : stream_write_change(action, s);
5548 5 : stream_stop_internal(xid);
5549 5 : }
5550 :
5551 : /*
5552 : * Sets streaming options including replication slot name and origin start
5553 : * position. Workers need these options for logical replication.
5554 : */
5555 : void
5556 432 : set_stream_options(WalRcvStreamOptions *options,
5557 : char *slotname,
5558 : XLogRecPtr *origin_startpos)
5559 : {
5560 : int server_version;
5561 :
5562 432 : options->logical = true;
5563 432 : options->startpoint = *origin_startpos;
5564 432 : options->slotname = slotname;
5565 :
5566 432 : server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
5567 432 : options->proto.logical.proto_version =
5568 432 : server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
5569 : server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
5570 : server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
5571 : LOGICALREP_PROTO_VERSION_NUM;
5572 :
5573 432 : options->proto.logical.publication_names = MySubscription->publications;
5574 432 : options->proto.logical.binary = MySubscription->binary;
5575 :
5576 : /*
5577 : * Assign the appropriate option value for streaming option according to
5578 : * the 'streaming' mode and the publisher's ability to support that mode.
5579 : */
5580 432 : if (server_version >= 160000 &&
5581 432 : MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
5582 : {
5583 398 : options->proto.logical.streaming_str = "parallel";
5584 398 : MyLogicalRepWorker->parallel_apply = true;
5585 : }
5586 34 : else if (server_version >= 140000 &&
5587 34 : MySubscription->stream != LOGICALREP_STREAM_OFF)
5588 : {
5589 26 : options->proto.logical.streaming_str = "on";
5590 26 : MyLogicalRepWorker->parallel_apply = false;
5591 : }
5592 : else
5593 : {
5594 8 : options->proto.logical.streaming_str = NULL;
5595 8 : MyLogicalRepWorker->parallel_apply = false;
5596 : }
5597 :
5598 432 : options->proto.logical.twophase = false;
5599 432 : options->proto.logical.origin = pstrdup(MySubscription->origin);
5600 432 : }
5601 :
5602 : /*
5603 : * Cleanup the memory for subxacts and reset the related variables.
5604 : */
5605 : static inline void
5606 376 : cleanup_subxact_info(void)
5607 : {
5608 376 : if (subxact_data.subxacts)
5609 87 : pfree(subxact_data.subxacts);
5610 :
5611 376 : subxact_data.subxacts = NULL;
5612 376 : subxact_data.subxact_last = InvalidTransactionId;
5613 376 : subxact_data.nsubxacts = 0;
5614 376 : subxact_data.nsubxacts_max = 0;
5615 376 : }
5616 :
5617 : /*
5618 : * Common function to run the apply loop with error handling. Disable the
5619 : * subscription, if necessary.
5620 : *
5621 : * Note that we don't handle FATAL errors which are probably because
5622 : * of system resource error and are not repeatable.
5623 : */
5624 : void
5625 432 : start_apply(XLogRecPtr origin_startpos)
5626 : {
5627 432 : PG_TRY();
5628 : {
5629 432 : LogicalRepApplyLoop(origin_startpos);
5630 : }
5631 87 : PG_CATCH();
5632 : {
5633 : /*
5634 : * Reset the origin state to prevent the advancement of origin
5635 : * progress if we fail to apply. Otherwise, this will result in
5636 : * transaction loss as that transaction won't be sent again by the
5637 : * server.
5638 : */
5639 87 : replorigin_xact_clear(true);
5640 :
5641 87 : if (MySubscription->disableonerr)
5642 3 : DisableSubscriptionAndExit();
5643 : else
5644 : {
5645 : /*
5646 : * Report the worker failed while applying changes. Abort the
5647 : * current transaction so that the stats message is sent in an
5648 : * idle state.
5649 : */
5650 84 : AbortOutOfAnyTransaction();
5651 84 : pgstat_report_subscription_error(MySubscription->oid);
5652 :
5653 84 : PG_RE_THROW();
5654 : }
5655 : }
5656 0 : PG_END_TRY();
5657 0 : }
5658 :
5659 : /*
5660 : * Runs the leader apply worker.
5661 : *
5662 : * It sets up replication origin, streaming options and then starts streaming.
5663 : */
5664 : static void
5665 280 : run_apply_worker(void)
5666 : {
5667 : char originname[NAMEDATALEN];
5668 280 : XLogRecPtr origin_startpos = InvalidXLogRecPtr;
5669 280 : char *slotname = NULL;
5670 : WalRcvStreamOptions options;
5671 : ReplOriginId originid;
5672 : TimeLineID startpointTLI;
5673 : char *err;
5674 : bool must_use_password;
5675 :
5676 280 : slotname = MySubscription->slotname;
5677 :
5678 : /*
5679 : * This shouldn't happen if the subscription is enabled, but guard against
5680 : * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
5681 : * slot is NULL.)
5682 : */
5683 280 : if (!slotname)
5684 0 : ereport(ERROR,
5685 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5686 : errmsg("subscription has no replication slot set")));
5687 :
5688 : /* Setup replication origin tracking. */
5689 280 : ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
5690 : originname, sizeof(originname));
5691 280 : StartTransactionCommand();
5692 280 : originid = replorigin_by_name(originname, true);
5693 280 : if (!OidIsValid(originid))
5694 0 : originid = replorigin_create(originname);
5695 280 : replorigin_session_setup(originid, 0);
5696 280 : replorigin_xact_state.origin = originid;
5697 280 : origin_startpos = replorigin_session_get_progress(false);
5698 280 : CommitTransactionCommand();
5699 :
5700 : /* Is the use of a password mandatory? */
5701 537 : must_use_password = MySubscription->passwordrequired &&
5702 257 : !MySubscription->ownersuperuser;
5703 :
5704 280 : LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
5705 : true, must_use_password,
5706 : MySubscription->name, &err);
5707 :
5708 269 : if (LogRepWorkerWalRcvConn == NULL)
5709 30 : ereport(ERROR,
5710 : (errcode(ERRCODE_CONNECTION_FAILURE),
5711 : errmsg("apply worker for subscription \"%s\" could not connect to the publisher: %s",
5712 : MySubscription->name, err)));
5713 :
5714 : /*
5715 : * We don't really use the output identify_system for anything but it does
5716 : * some initializations on the upstream so let's still call it.
5717 : */
5718 239 : (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
5719 :
5720 239 : set_apply_error_context_origin(originname);
5721 :
5722 239 : set_stream_options(&options, slotname, &origin_startpos);
5723 :
5724 : /*
5725 : * Even when the two_phase mode is requested by the user, it remains as
5726 : * the tri-state PENDING until all tablesyncs have reached READY state.
5727 : * Only then, can it become ENABLED.
5728 : *
5729 : * Note: If the subscription has no tables then leave the state as
5730 : * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
5731 : * work.
5732 : */
5733 255 : if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
5734 16 : AllTablesyncsReady())
5735 : {
5736 : /* Start streaming with two_phase enabled */
5737 9 : options.proto.logical.twophase = true;
5738 9 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
5739 :
5740 9 : StartTransactionCommand();
5741 :
5742 : /*
5743 : * Updating pg_subscription might involve TOAST table access, so
5744 : * ensure we have a valid snapshot.
5745 : */
5746 9 : PushActiveSnapshot(GetTransactionSnapshot());
5747 :
5748 9 : UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
5749 9 : MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
5750 9 : PopActiveSnapshot();
5751 9 : CommitTransactionCommand();
5752 : }
5753 : else
5754 : {
5755 230 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
5756 : }
5757 :
5758 239 : ereport(DEBUG1,
5759 : (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
5760 : MySubscription->name,
5761 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
5762 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
5763 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
5764 : "?")));
5765 :
5766 : /* Run the main loop. */
5767 239 : start_apply(origin_startpos);
5768 0 : }
5769 :
5770 : /*
5771 : * Common initialization for leader apply worker, parallel apply worker,
5772 : * tablesync worker and sequencesync worker.
5773 : *
5774 : * Initialize the database connection, in-memory subscription and necessary
5775 : * config options.
5776 : */
5777 : void
5778 577 : InitializeLogRepWorker(void)
5779 : {
5780 : MemoryContext oldctx;
5781 :
5782 : /* Run as replica session replication role. */
5783 577 : SetConfigOption("session_replication_role", "replica",
5784 : PGC_SUSET, PGC_S_OVERRIDE);
5785 :
5786 : /* Connect to our database. */
5787 577 : BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
5788 577 : MyLogicalRepWorker->userid,
5789 : 0);
5790 :
5791 : /*
5792 : * Set always-secure search path, so malicious users can't redirect user
5793 : * code (e.g. pg_index.indexprs).
5794 : */
5795 574 : SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
5796 :
5797 : /* Load the subscription into persistent memory context. */
5798 574 : ApplyContext = AllocSetContextCreate(TopMemoryContext,
5799 : "ApplyContext",
5800 : ALLOCSET_DEFAULT_SIZES);
5801 574 : StartTransactionCommand();
5802 574 : oldctx = MemoryContextSwitchTo(ApplyContext);
5803 :
5804 : /*
5805 : * Lock the subscription to prevent it from being concurrently dropped,
5806 : * then re-verify its existence. After the initialization, the worker will
5807 : * be terminated gracefully if the subscription is dropped.
5808 : */
5809 574 : LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, 0,
5810 : AccessShareLock);
5811 574 : MySubscription = GetSubscription(MyLogicalRepWorker->subid, true, true);
5812 574 : if (!MySubscription)
5813 : {
5814 64 : ereport(LOG,
5815 : (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
5816 : MyLogicalRepWorker->subid)));
5817 :
5818 : /* Ensure we remove no-longer-useful entry for worker's start time */
5819 64 : if (am_leader_apply_worker())
5820 64 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
5821 :
5822 64 : proc_exit(0);
5823 : }
5824 :
5825 510 : MySubscriptionValid = true;
5826 510 : MemoryContextSwitchTo(oldctx);
5827 :
5828 510 : if (!MySubscription->enabled)
5829 : {
5830 0 : ereport(LOG,
5831 : (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
5832 : MySubscription->name)));
5833 :
5834 0 : apply_worker_exit();
5835 : }
5836 :
5837 : /*
5838 : * Restart the worker if retain_dead_tuples was enabled during startup.
5839 : *
5840 : * At this point, the replication slot used for conflict detection might
5841 : * not exist yet, or could be dropped soon if the launcher perceives
5842 : * retain_dead_tuples as disabled. To avoid unnecessary tracking of
5843 : * oldest_nonremovable_xid when the slot is absent or at risk of being
5844 : * dropped, a restart is initiated.
5845 : *
5846 : * The oldest_nonremovable_xid should be initialized only when the
5847 : * subscription's retention is active before launching the worker. See
5848 : * logicalrep_worker_launch.
5849 : */
5850 510 : if (am_leader_apply_worker() &&
5851 280 : MySubscription->retaindeadtuples &&
5852 15 : MySubscription->retentionactive &&
5853 15 : !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid))
5854 : {
5855 0 : ereport(LOG,
5856 : errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup",
5857 : MySubscription->name, "retain_dead_tuples"));
5858 :
5859 0 : apply_worker_exit();
5860 : }
5861 :
5862 : /* Setup synchronous commit according to the user's wishes */
5863 510 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
5864 : PGC_BACKEND, PGC_S_OVERRIDE);
5865 :
5866 : /* Change wal_receiver_timeout according to the user's wishes */
5867 510 : set_wal_receiver_timeout();
5868 :
5869 : /*
5870 : * Keep us informed about subscription or role changes. Note that the
5871 : * role's superuser privilege can be revoked.
5872 : */
5873 510 : CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
5874 : subscription_change_cb,
5875 : (Datum) 0);
5876 : /* Changes to foreign servers may affect subscriptions using SERVER. */
5877 510 : CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
5878 : subscription_change_cb,
5879 : (Datum) 0);
5880 : /* Changes to user mappings may affect subscriptions using SERVER. */
5881 510 : CacheRegisterSyscacheCallback(USERMAPPINGOID,
5882 : subscription_change_cb,
5883 : (Datum) 0);
5884 :
5885 : /*
5886 : * Changes to FDW connection_function may affect subscriptions using
5887 : * SERVER.
5888 : */
5889 510 : CacheRegisterSyscacheCallback(FOREIGNDATAWRAPPEROID,
5890 : subscription_change_cb,
5891 : (Datum) 0);
5892 :
5893 510 : CacheRegisterSyscacheCallback(AUTHOID,
5894 : subscription_change_cb,
5895 : (Datum) 0);
5896 :
5897 510 : if (am_tablesync_worker())
5898 208 : ereport(LOG,
5899 : errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
5900 : MySubscription->name,
5901 : get_rel_name(MyLogicalRepWorker->relid)));
5902 302 : else if (am_sequencesync_worker())
5903 9 : ereport(LOG,
5904 : errmsg("logical replication sequence synchronization worker for subscription \"%s\" has started",
5905 : MySubscription->name));
5906 : else
5907 293 : ereport(LOG,
5908 : errmsg("logical replication apply worker for subscription \"%s\" has started",
5909 : MySubscription->name));
5910 :
5911 510 : CommitTransactionCommand();
5912 :
5913 : /*
5914 : * Register a callback to reset the origin state before aborting any
5915 : * pending transaction during shutdown (see ShutdownPostgres()). This will
5916 : * avoid origin advancement for an incomplete transaction which could
5917 : * otherwise lead to its loss as such a transaction won't be sent by the
5918 : * server again.
5919 : *
5920 : * Note that even a LOG or DEBUG statement placed after setting the origin
5921 : * state may process a shutdown signal before committing the current apply
5922 : * operation. So, it is important to register such a callback here.
5923 : *
5924 : * Register this callback here to ensure that all types of logical
5925 : * replication workers that set up origins and apply remote transactions
5926 : * are protected.
5927 : */
5928 510 : before_shmem_exit(on_exit_clear_xact_state, (Datum) 0);
5929 510 : }
5930 :
5931 : /*
5932 : * Callback on exit to clear transaction-level replication origin state.
5933 : */
5934 : static void
5935 510 : on_exit_clear_xact_state(int code, Datum arg)
5936 : {
5937 510 : replorigin_xact_clear(true);
5938 510 : }
5939 :
5940 : /*
5941 : * Common function to setup the leader apply, tablesync and sequencesync worker.
5942 : */
5943 : void
5944 564 : SetupApplyOrSyncWorker(int worker_slot)
5945 : {
5946 : /* Attach to slot */
5947 564 : logicalrep_worker_attach(worker_slot);
5948 :
5949 : Assert(am_tablesync_worker() || am_sequencesync_worker() || am_leader_apply_worker());
5950 :
5951 : /* Setup signal handling */
5952 564 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
5953 564 : BackgroundWorkerUnblockSignals();
5954 :
5955 : /*
5956 : * We don't currently need any ResourceOwner in a walreceiver process, but
5957 : * if we did, we could call CreateAuxProcessResourceOwner here.
5958 : */
5959 :
5960 : /* Initialise stats to a sanish value */
5961 564 : MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
5962 564 : MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
5963 :
5964 : /* Load the libpq-specific functions */
5965 564 : load_file("libpqwalreceiver", false);
5966 :
5967 564 : InitializeLogRepWorker();
5968 :
5969 : /* Connect to the origin and start the replication. */
5970 497 : elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
5971 : MySubscription->conninfo);
5972 :
5973 : /*
5974 : * Setup callback for syscache so that we know when something changes in
5975 : * the subscription relation state.
5976 : */
5977 497 : CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
5978 : InvalidateSyncingRelStates,
5979 : (Datum) 0);
5980 497 : }
5981 :
5982 : /* Logical Replication Apply worker entry point */
5983 : void
5984 347 : ApplyWorkerMain(Datum main_arg)
5985 : {
5986 347 : int worker_slot = DatumGetInt32(main_arg);
5987 :
5988 347 : InitializingApplyWorker = true;
5989 :
5990 347 : SetupApplyOrSyncWorker(worker_slot);
5991 :
5992 280 : InitializingApplyWorker = false;
5993 :
5994 280 : run_apply_worker();
5995 :
5996 0 : proc_exit(0);
5997 : }
5998 :
5999 : /*
6000 : * After error recovery, disable the subscription in a new transaction
6001 : * and exit cleanly.
6002 : */
6003 : void
6004 4 : DisableSubscriptionAndExit(void)
6005 : {
6006 : /*
6007 : * Emit the error message, and recover from the error state to an idle
6008 : * state
6009 : */
6010 4 : HOLD_INTERRUPTS();
6011 :
6012 4 : EmitErrorReport();
6013 4 : AbortOutOfAnyTransaction();
6014 4 : FlushErrorState();
6015 :
6016 4 : RESUME_INTERRUPTS();
6017 :
6018 : /*
6019 : * Report the worker failed during sequence synchronization, table
6020 : * synchronization, or apply.
6021 : */
6022 4 : pgstat_report_subscription_error(MyLogicalRepWorker->subid);
6023 :
6024 : /* Disable the subscription */
6025 4 : StartTransactionCommand();
6026 :
6027 : /*
6028 : * Updating pg_subscription might involve TOAST table access, so ensure we
6029 : * have a valid snapshot.
6030 : */
6031 4 : PushActiveSnapshot(GetTransactionSnapshot());
6032 :
6033 4 : DisableSubscription(MySubscription->oid);
6034 4 : PopActiveSnapshot();
6035 4 : CommitTransactionCommand();
6036 :
6037 : /* Ensure we remove no-longer-useful entry for worker's start time */
6038 4 : if (am_leader_apply_worker())
6039 3 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
6040 :
6041 : /* Notify the subscription has been disabled and exit */
6042 4 : ereport(LOG,
6043 : errmsg("subscription \"%s\" has been disabled because of an error",
6044 : MySubscription->name));
6045 :
6046 : /*
6047 : * Skip the track_commit_timestamp check when disabling the worker due to
6048 : * an error, as verifying commit timestamps is unnecessary in this
6049 : * context.
6050 : */
6051 4 : CheckSubDeadTupleRetention(false, true, WARNING,
6052 4 : MySubscription->retaindeadtuples,
6053 4 : MySubscription->retentionactive, false);
6054 :
6055 4 : proc_exit(0);
6056 : }
6057 :
6058 : /*
6059 : * Is current process a logical replication worker?
6060 : */
6061 : bool
6062 2187 : IsLogicalWorker(void)
6063 : {
6064 2187 : return MyLogicalRepWorker != NULL;
6065 : }
6066 :
6067 : /*
6068 : * Is current process a logical replication parallel apply worker?
6069 : */
6070 : bool
6071 1499 : IsLogicalParallelApplyWorker(void)
6072 : {
6073 1499 : return IsLogicalWorker() && am_parallel_apply_worker();
6074 : }
6075 :
6076 : /*
6077 : * Start skipping changes of the transaction if the given LSN matches the
6078 : * LSN specified by subscription's skiplsn.
6079 : */
6080 : static void
6081 540 : maybe_start_skipping_changes(XLogRecPtr finish_lsn)
6082 : {
6083 : Assert(!is_skipping_changes());
6084 : Assert(!in_remote_transaction);
6085 : Assert(!in_streamed_transaction);
6086 :
6087 : /*
6088 : * Quick return if it's not requested to skip this transaction. This
6089 : * function is called for every remote transaction and we assume that
6090 : * skipping the transaction is not used often.
6091 : */
6092 540 : if (likely(!XLogRecPtrIsValid(MySubscription->skiplsn) ||
6093 : MySubscription->skiplsn != finish_lsn))
6094 537 : return;
6095 :
6096 : /* Start skipping all changes of this transaction */
6097 3 : skip_xact_finish_lsn = finish_lsn;
6098 :
6099 3 : ereport(LOG,
6100 : errmsg("logical replication starts skipping transaction at LSN %X/%08X",
6101 : LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
6102 : }
6103 :
6104 : /*
6105 : * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
6106 : */
6107 : static void
6108 30 : stop_skipping_changes(void)
6109 : {
6110 30 : if (!is_skipping_changes())
6111 27 : return;
6112 :
6113 3 : ereport(LOG,
6114 : errmsg("logical replication completed skipping transaction at LSN %X/%08X",
6115 : LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
6116 :
6117 : /* Stop skipping changes */
6118 3 : skip_xact_finish_lsn = InvalidXLogRecPtr;
6119 : }
6120 :
6121 : /*
6122 : * Clear subskiplsn of pg_subscription catalog.
6123 : *
6124 : * finish_lsn is the transaction's finish LSN that is used to check if the
6125 : * subskiplsn matches it. If not matched, we raise a warning when clearing the
6126 : * subskiplsn in order to inform users for cases e.g., where the user mistakenly
6127 : * specified the wrong subskiplsn.
6128 : */
6129 : static void
6130 537 : clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
6131 : {
6132 : Relation rel;
6133 : Form_pg_subscription subform;
6134 : HeapTuple tup;
6135 537 : XLogRecPtr myskiplsn = MySubscription->skiplsn;
6136 537 : bool started_tx = false;
6137 :
6138 537 : if (likely(!XLogRecPtrIsValid(myskiplsn)) || am_parallel_apply_worker())
6139 534 : return;
6140 :
6141 3 : if (!IsTransactionState())
6142 : {
6143 1 : StartTransactionCommand();
6144 1 : started_tx = true;
6145 : }
6146 :
6147 : /*
6148 : * Updating pg_subscription might involve TOAST table access, so ensure we
6149 : * have a valid snapshot.
6150 : */
6151 3 : PushActiveSnapshot(GetTransactionSnapshot());
6152 :
6153 : /*
6154 : * Protect subskiplsn of pg_subscription from being concurrently updated
6155 : * while clearing it.
6156 : */
6157 3 : LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
6158 : AccessShareLock);
6159 :
6160 3 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
6161 :
6162 : /* Fetch the existing tuple. */
6163 3 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
6164 : ObjectIdGetDatum(MySubscription->oid));
6165 :
6166 3 : if (!HeapTupleIsValid(tup))
6167 0 : elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
6168 :
6169 3 : subform = (Form_pg_subscription) GETSTRUCT(tup);
6170 :
6171 : /*
6172 : * Clear the subskiplsn. If the user has already changed subskiplsn before
6173 : * clearing it we don't update the catalog and the replication origin
6174 : * state won't get advanced. So in the worst case, if the server crashes
6175 : * before sending an acknowledgment of the flush position the transaction
6176 : * will be sent again and the user needs to set subskiplsn again. We can
6177 : * reduce the possibility by logging a replication origin WAL record to
6178 : * advance the origin LSN instead but there is no way to advance the
6179 : * origin timestamp and it doesn't seem to be worth doing anything about
6180 : * it since it's a very rare case.
6181 : */
6182 3 : if (subform->subskiplsn == myskiplsn)
6183 : {
6184 : bool nulls[Natts_pg_subscription];
6185 : bool replaces[Natts_pg_subscription];
6186 : Datum values[Natts_pg_subscription];
6187 :
6188 3 : memset(values, 0, sizeof(values));
6189 3 : memset(nulls, false, sizeof(nulls));
6190 3 : memset(replaces, false, sizeof(replaces));
6191 :
6192 : /* reset subskiplsn */
6193 3 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
6194 3 : replaces[Anum_pg_subscription_subskiplsn - 1] = true;
6195 :
6196 3 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
6197 : replaces);
6198 3 : CatalogTupleUpdate(rel, &tup->t_self, tup);
6199 :
6200 3 : if (myskiplsn != finish_lsn)
6201 0 : ereport(WARNING,
6202 : errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
6203 : errdetail("Remote transaction's finish WAL location (LSN) %X/%08X did not match skip-LSN %X/%08X.",
6204 : LSN_FORMAT_ARGS(finish_lsn),
6205 : LSN_FORMAT_ARGS(myskiplsn)));
6206 : }
6207 :
6208 3 : heap_freetuple(tup);
6209 3 : table_close(rel, NoLock);
6210 :
6211 3 : PopActiveSnapshot();
6212 :
6213 3 : if (started_tx)
6214 1 : CommitTransactionCommand();
6215 : }
6216 :
6217 : /* Error callback to give more context info about the change being applied */
6218 : void
6219 11900 : apply_error_callback(void *arg)
6220 : {
6221 11900 : ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
6222 :
6223 11900 : if (apply_error_callback_arg.command == 0)
6224 11222 : return;
6225 :
6226 : Assert(errarg->origin_name);
6227 :
6228 678 : if (errarg->rel == NULL)
6229 : {
6230 334 : if (!TransactionIdIsValid(errarg->remote_xid))
6231 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
6232 : errarg->origin_name,
6233 : logicalrep_message_type(errarg->command));
6234 334 : else if (!XLogRecPtrIsValid(errarg->finish_lsn))
6235 262 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
6236 : errarg->origin_name,
6237 : logicalrep_message_type(errarg->command),
6238 : errarg->remote_xid);
6239 : else
6240 144 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%08X",
6241 : errarg->origin_name,
6242 : logicalrep_message_type(errarg->command),
6243 : errarg->remote_xid,
6244 72 : LSN_FORMAT_ARGS(errarg->finish_lsn));
6245 : }
6246 : else
6247 : {
6248 344 : if (errarg->remote_attnum < 0)
6249 : {
6250 344 : if (!XLogRecPtrIsValid(errarg->finish_lsn))
6251 536 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
6252 : errarg->origin_name,
6253 : logicalrep_message_type(errarg->command),
6254 268 : errarg->rel->remoterel.nspname,
6255 268 : errarg->rel->remoterel.relname,
6256 : errarg->remote_xid);
6257 : else
6258 152 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%08X",
6259 : errarg->origin_name,
6260 : logicalrep_message_type(errarg->command),
6261 76 : errarg->rel->remoterel.nspname,
6262 76 : errarg->rel->remoterel.relname,
6263 : errarg->remote_xid,
6264 76 : LSN_FORMAT_ARGS(errarg->finish_lsn));
6265 : }
6266 : else
6267 : {
6268 0 : if (!XLogRecPtrIsValid(errarg->finish_lsn))
6269 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
6270 : errarg->origin_name,
6271 : logicalrep_message_type(errarg->command),
6272 0 : errarg->rel->remoterel.nspname,
6273 0 : errarg->rel->remoterel.relname,
6274 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
6275 : errarg->remote_xid);
6276 : else
6277 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%08X",
6278 : errarg->origin_name,
6279 : logicalrep_message_type(errarg->command),
6280 0 : errarg->rel->remoterel.nspname,
6281 0 : errarg->rel->remoterel.relname,
6282 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
6283 : errarg->remote_xid,
6284 0 : LSN_FORMAT_ARGS(errarg->finish_lsn));
6285 : }
6286 : }
6287 : }
6288 :
6289 : /* Set transaction information of apply error callback */
6290 : static inline void
6291 2937 : set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
6292 : {
6293 2937 : apply_error_callback_arg.remote_xid = xid;
6294 2937 : apply_error_callback_arg.finish_lsn = lsn;
6295 2937 : }
6296 :
6297 : /* Reset all information of apply error callback */
6298 : static inline void
6299 1436 : reset_apply_error_context_info(void)
6300 : {
6301 1436 : apply_error_callback_arg.command = 0;
6302 1436 : apply_error_callback_arg.rel = NULL;
6303 1436 : apply_error_callback_arg.remote_attnum = -1;
6304 1436 : set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
6305 1436 : }
6306 :
6307 : /*
6308 : * Request wakeup of the workers for the given subscription OID
6309 : * at commit of the current transaction.
6310 : *
6311 : * This is used to ensure that the workers process assorted changes
6312 : * as soon as possible.
6313 : */
6314 : void
6315 234 : LogicalRepWorkersWakeupAtCommit(Oid subid)
6316 : {
6317 : MemoryContext oldcxt;
6318 :
6319 234 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
6320 234 : on_commit_wakeup_workers_subids =
6321 234 : list_append_unique_oid(on_commit_wakeup_workers_subids, subid);
6322 234 : MemoryContextSwitchTo(oldcxt);
6323 234 : }
6324 :
6325 : /*
6326 : * Wake up the workers of any subscriptions that were changed in this xact.
6327 : */
6328 : void
6329 559025 : AtEOXact_LogicalRepWorkers(bool isCommit)
6330 : {
6331 559025 : if (isCommit && on_commit_wakeup_workers_subids != NIL)
6332 : {
6333 : ListCell *lc;
6334 :
6335 229 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
6336 458 : foreach(lc, on_commit_wakeup_workers_subids)
6337 : {
6338 229 : Oid subid = lfirst_oid(lc);
6339 : List *workers;
6340 : ListCell *lc2;
6341 :
6342 229 : workers = logicalrep_workers_find(subid, true, false);
6343 300 : foreach(lc2, workers)
6344 : {
6345 71 : LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
6346 :
6347 71 : logicalrep_worker_wakeup_ptr(worker);
6348 : }
6349 : }
6350 229 : LWLockRelease(LogicalRepWorkerLock);
6351 : }
6352 :
6353 : /* The List storage will be reclaimed automatically in xact cleanup. */
6354 559025 : on_commit_wakeup_workers_subids = NIL;
6355 559025 : }
6356 :
6357 : /*
6358 : * Allocate the origin name in long-lived context for error context message.
6359 : */
6360 : void
6361 445 : set_apply_error_context_origin(char *originname)
6362 : {
6363 445 : apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
6364 : originname);
6365 445 : }
6366 :
6367 : /*
6368 : * Return the action to be taken for the given transaction. See
6369 : * TransApplyAction for information on each of the actions.
6370 : *
6371 : * *winfo is assigned to the destination parallel worker info when the leader
6372 : * apply worker has to pass all the transaction's changes to the parallel
6373 : * apply worker.
6374 : */
6375 : static TransApplyAction
6376 326674 : get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
6377 : {
6378 326674 : *winfo = NULL;
6379 :
6380 326674 : if (am_parallel_apply_worker())
6381 : {
6382 69245 : return TRANS_PARALLEL_APPLY;
6383 : }
6384 :
6385 : /*
6386 : * If we are processing this transaction using a parallel apply worker
6387 : * then either we send the changes to the parallel worker or if the worker
6388 : * is busy then serialize the changes to the file which will later be
6389 : * processed by the parallel worker.
6390 : */
6391 257429 : *winfo = pa_find_worker(xid);
6392 :
6393 257429 : if (*winfo && (*winfo)->serialize_changes)
6394 : {
6395 5037 : return TRANS_LEADER_PARTIAL_SERIALIZE;
6396 : }
6397 252392 : else if (*winfo)
6398 : {
6399 68911 : return TRANS_LEADER_SEND_TO_PARALLEL;
6400 : }
6401 :
6402 : /*
6403 : * If there is no parallel worker involved to process this transaction
6404 : * then we either directly apply the change or serialize it to a file
6405 : * which will later be applied when the transaction finish message is
6406 : * processed.
6407 : */
6408 183481 : else if (in_streamed_transaction)
6409 : {
6410 103199 : return TRANS_LEADER_SERIALIZE;
6411 : }
6412 : else
6413 : {
6414 80282 : return TRANS_LEADER_APPLY;
6415 : }
6416 : }
|