Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * worker.c
3 : * PostgreSQL logical replication worker (apply)
4 : *
5 : * Copyright (c) 2016-2026, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/worker.c
9 : *
10 : * NOTES
11 : * This file contains the worker which applies logical changes as they come
12 : * from remote logical replication stream.
13 : *
14 : * The main worker (apply) is started by logical replication worker
15 : * launcher for every enabled subscription in a database. It uses
16 : * walsender protocol to communicate with publisher.
17 : *
18 : * This module includes server facing code and shares libpqwalreceiver
19 : * module with walreceiver for providing the libpq specific functionality.
20 : *
21 : *
22 : * STREAMED TRANSACTIONS
23 : * ---------------------
24 : * Streamed transactions (large transactions exceeding a memory limit on the
25 : * upstream) are applied using one of two approaches:
26 : *
27 : * 1) Write to temporary files and apply when the final commit arrives
28 : *
29 : * This approach is used when the user has set the subscription's streaming
30 : * option as on.
31 : *
32 : * Unlike the regular (non-streamed) case, handling streamed transactions has
33 : * to handle aborts of both the toplevel transaction and subtransactions. This
34 : * is achieved by tracking offsets for subtransactions, which is then used
35 : * to truncate the file with serialized changes.
36 : *
37 : * The files are placed in tmp file directory by default, and the filenames
38 : * include both the XID of the toplevel transaction and OID of the
39 : * subscription. This is necessary so that different workers processing a
40 : * remote transaction with the same XID doesn't interfere.
41 : *
42 : * We use BufFiles instead of using normal temporary files because (a) the
43 : * BufFile infrastructure supports temporary files that exceed the OS file size
44 : * limit, (b) provides a way for automatic clean up on the error and (c) provides
45 : * a way to survive these files across local transactions and allow to open and
46 : * close at stream start and close. We decided to use FileSet
47 : * infrastructure as without that it deletes the files on the closure of the
48 : * file and if we decide to keep stream files open across the start/stop stream
49 : * then it will consume a lot of memory (more than 8K for each BufFile and
50 : * there could be multiple such BufFiles as the subscriber could receive
51 : * multiple start/stop streams for different transactions before getting the
52 : * commit). Moreover, if we don't use FileSet then we also need to invent
53 : * a new way to pass filenames to BufFile APIs so that we are allowed to open
54 : * the file we desired across multiple stream-open calls for the same
55 : * transaction.
56 : *
57 : * 2) Parallel apply workers.
58 : *
59 : * This approach is used when the user has set the subscription's streaming
60 : * option as parallel. See logical/applyparallelworker.c for information about
61 : * this approach.
62 : *
63 : * TWO_PHASE TRANSACTIONS
64 : * ----------------------
65 : * Two phase transactions are replayed at prepare and then committed or
66 : * rolled back at commit prepared and rollback prepared respectively. It is
67 : * possible to have a prepared transaction that arrives at the apply worker
68 : * when the tablesync is busy doing the initial copy. In this case, the apply
69 : * worker skips all the prepared operations [e.g. inserts] while the tablesync
70 : * is still busy (see the condition of should_apply_changes_for_rel). The
71 : * tablesync worker might not get such a prepared transaction because say it
72 : * was prior to the initial consistent point but might have got some later
73 : * commits. Now, the tablesync worker will exit without doing anything for the
74 : * prepared transaction skipped by the apply worker as the sync location for it
75 : * will be already ahead of the apply worker's current location. This would lead
76 : * to an "empty prepare", because later when the apply worker does the commit
77 : * prepare, there is nothing in it (the inserts were skipped earlier).
78 : *
79 : * To avoid this, and similar prepare confusions the subscription's two_phase
80 : * commit is enabled only after the initial sync is over. The two_phase option
81 : * has been implemented as a tri-state with values DISABLED, PENDING, and
82 : * ENABLED.
83 : *
84 : * Even if the user specifies they want a subscription with two_phase = on,
85 : * internally it will start with a tri-state of PENDING which only becomes
86 : * ENABLED after all tablesync initializations are completed - i.e. when all
87 : * tablesync workers have reached their READY state. In other words, the value
88 : * PENDING is only a temporary state for subscription start-up.
89 : *
90 : * Until the two_phase is properly available (ENABLED) the subscription will
91 : * behave as if two_phase = off. When the apply worker detects that all
92 : * tablesyncs have become READY (while the tri-state was PENDING) it will
93 : * restart the apply worker process. This happens in
94 : * ProcessSyncingTablesForApply.
95 : *
96 : * When the (re-started) apply worker finds that all tablesyncs are READY for a
97 : * two_phase tri-state of PENDING it start streaming messages with the
98 : * two_phase option which in turn enables the decoding of two-phase commits at
99 : * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
100 : * Now, it is possible that during the time we have not enabled two_phase, the
101 : * publisher (replication server) would have skipped some prepares but we
102 : * ensure that such prepares are sent along with commit prepare, see
103 : * ReorderBufferFinishPrepared.
104 : *
105 : * If the subscription has no tables then a two_phase tri-state PENDING is
106 : * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
107 : * PUBLICATION which might otherwise be disallowed (see below).
108 : *
109 : * If ever a user needs to be aware of the tri-state value, they can fetch it
110 : * from the pg_subscription catalog (see column subtwophasestate).
111 : *
112 : * Finally, to avoid problems mentioned in previous paragraphs from any
113 : * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
114 : * to 'off' and then again back to 'on') there is a restriction for
115 : * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
116 : * the two_phase tri-state is ENABLED, except when copy_data = false.
117 : *
118 : * We can get prepare of the same GID more than once for the genuine cases
119 : * where we have defined multiple subscriptions for publications on the same
120 : * server and prepared transaction has operations on tables subscribed to those
121 : * subscriptions. For such cases, if we use the GID sent by publisher one of
122 : * the prepares will be successful and others will fail, in which case the
123 : * server will send them again. Now, this can lead to a deadlock if user has
124 : * set synchronous_standby_names for all the subscriptions on subscriber. To
125 : * avoid such deadlocks, we generate a unique GID (consisting of the
126 : * subscription oid and the xid of the prepared transaction) for each prepare
127 : * transaction on the subscriber.
128 : *
129 : * FAILOVER
130 : * ----------------------
131 : * The logical slot on the primary can be synced to the standby by specifying
132 : * failover = true when creating the subscription. Enabling failover allows us
133 : * to smoothly transition to the promoted standby, ensuring that we can
134 : * subscribe to the new primary without losing any data.
135 : *
136 : * RETAIN DEAD TUPLES
137 : * ----------------------
138 : * Each apply worker that enabled retain_dead_tuples option maintains a
139 : * non-removable transaction ID (oldest_nonremovable_xid) in shared memory to
140 : * prevent dead rows from being removed prematurely when the apply worker still
141 : * needs them to detect update_deleted conflicts. Additionally, this helps to
142 : * retain the required commit_ts module information, which further helps to
143 : * detect update_origin_differs and delete_origin_differs conflicts reliably, as
144 : * otherwise, vacuum freeze could remove the required information.
145 : *
146 : * The logical replication launcher manages an internal replication slot named
147 : * "pg_conflict_detection". It asynchronously aggregates the non-removable
148 : * transaction ID from all apply workers to determine the appropriate xmin for
149 : * the slot, thereby retaining necessary tuples.
150 : *
151 : * The non-removable transaction ID in the apply worker is advanced to the
152 : * oldest running transaction ID once all concurrent transactions on the
153 : * publisher have been applied and flushed locally. The process involves:
154 : *
155 : * - RDT_GET_CANDIDATE_XID:
156 : * Call GetOldestActiveTransactionId() to take oldestRunningXid as the
157 : * candidate xid.
158 : *
159 : * - RDT_REQUEST_PUBLISHER_STATUS:
160 : * Send a message to the walsender requesting the publisher status, which
161 : * includes the latest WAL write position and information about transactions
162 : * that are in the commit phase.
163 : *
164 : * - RDT_WAIT_FOR_PUBLISHER_STATUS:
165 : * Wait for the status from the walsender. After receiving the first status,
166 : * do not proceed if there are concurrent remote transactions that are still
167 : * in the commit phase. These transactions might have been assigned an
168 : * earlier commit timestamp but have not yet written the commit WAL record.
169 : * Continue to request the publisher status (RDT_REQUEST_PUBLISHER_STATUS)
170 : * until all these transactions have completed.
171 : *
172 : * - RDT_WAIT_FOR_LOCAL_FLUSH:
173 : * Advance the non-removable transaction ID if the current flush location has
174 : * reached or surpassed the last received WAL position.
175 : *
176 : * - RDT_STOP_CONFLICT_INFO_RETENTION:
177 : * This phase is required only when max_retention_duration is defined. We
178 : * enter this phase if the wait time in either the
179 : * RDT_WAIT_FOR_PUBLISHER_STATUS or RDT_WAIT_FOR_LOCAL_FLUSH phase exceeds
180 : * configured max_retention_duration. In this phase,
181 : * pg_subscription.subretentionactive is updated to false within a new
182 : * transaction, and oldest_nonremovable_xid is set to InvalidTransactionId.
183 : *
184 : * - RDT_RESUME_CONFLICT_INFO_RETENTION:
185 : * This phase is required only when max_retention_duration is defined. We
186 : * enter this phase if the retention was previously stopped, and the time
187 : * required to advance the non-removable transaction ID in the
188 : * RDT_WAIT_FOR_LOCAL_FLUSH phase has decreased to within acceptable limits
189 : * (or if max_retention_duration is set to 0). During this phase,
190 : * pg_subscription.subretentionactive is updated to true within a new
191 : * transaction, and the worker will be restarted.
192 : *
193 : * The overall state progression is: GET_CANDIDATE_XID ->
194 : * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
195 : * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
196 : * WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID.
197 : *
198 : * Retaining the dead tuples for this period is sufficient for ensuring
199 : * eventual consistency using last-update-wins strategy, as dead tuples are
200 : * useful for detecting conflicts only during the application of concurrent
201 : * transactions from remote nodes. After applying and flushing all remote
202 : * transactions that occurred concurrently with the tuple DELETE, any
203 : * subsequent UPDATE from a remote node should have a later timestamp. In such
204 : * cases, it is acceptable to detect an update_missing scenario and convert the
205 : * UPDATE to an INSERT when applying it. But, for concurrent remote
206 : * transactions with earlier timestamps than the DELETE, detecting
207 : * update_deleted is necessary, as the UPDATEs in remote transactions should be
208 : * ignored if their timestamp is earlier than that of the dead tuples.
209 : *
210 : * Note that advancing the non-removable transaction ID is not supported if the
211 : * publisher is also a physical standby. This is because the logical walsender
212 : * on the standby can only get the WAL replay position but there may be more
213 : * WALs that are being replicated from the primary and those WALs could have
214 : * earlier commit timestamp.
215 : *
216 : * Similarly, when the publisher has subscribed to another publisher,
217 : * information necessary for conflict detection cannot be retained for
218 : * changes from origins other than the publisher. This is because publisher
219 : * lacks the information on concurrent transactions of other publishers to
220 : * which it subscribes. As the information on concurrent transactions is
221 : * unavailable beyond subscriber's immediate publishers, the non-removable
222 : * transaction ID might be advanced prematurely before changes from other
223 : * origins have been fully applied.
224 : *
225 : * XXX Retaining information for changes from other origins might be possible
226 : * by requesting the subscription on that origin to enable retain_dead_tuples
227 : * and fetching the conflict detection slot.xmin along with the publisher's
228 : * status. In the RDT_WAIT_FOR_PUBLISHER_STATUS phase, the apply worker could
229 : * wait for the remote slot's xmin to reach the oldest active transaction ID,
230 : * ensuring that all transactions from other origins have been applied on the
231 : * publisher, thereby getting the latest WAL position that includes all
232 : * concurrent changes. However, this approach may impact performance, so it
233 : * might not worth the effort.
234 : *
235 : * XXX It seems feasible to get the latest commit's WAL location from the
236 : * publisher and wait till that is applied. However, we can't do that
237 : * because commit timestamps can regress as a commit with a later LSN is not
238 : * guaranteed to have a later timestamp than those with earlier LSNs. Having
239 : * said that, even if that is possible, it won't improve performance much as
240 : * the apply always lag and moves slowly as compared with the transactions
241 : * on the publisher.
242 : *-------------------------------------------------------------------------
243 : */
244 :
245 : #include "postgres.h"
246 :
247 : #include <sys/stat.h>
248 : #include <unistd.h>
249 :
250 : #include "access/genam.h"
251 : #include "access/commit_ts.h"
252 : #include "access/table.h"
253 : #include "access/tableam.h"
254 : #include "access/tupconvert.h"
255 : #include "access/twophase.h"
256 : #include "access/xact.h"
257 : #include "catalog/indexing.h"
258 : #include "catalog/pg_inherits.h"
259 : #include "catalog/pg_subscription.h"
260 : #include "catalog/pg_subscription_rel.h"
261 : #include "commands/subscriptioncmds.h"
262 : #include "commands/tablecmds.h"
263 : #include "commands/trigger.h"
264 : #include "executor/executor.h"
265 : #include "executor/execPartition.h"
266 : #include "libpq/pqformat.h"
267 : #include "miscadmin.h"
268 : #include "optimizer/optimizer.h"
269 : #include "parser/parse_relation.h"
270 : #include "pgstat.h"
271 : #include "port/pg_bitutils.h"
272 : #include "postmaster/bgworker.h"
273 : #include "postmaster/interrupt.h"
274 : #include "postmaster/walwriter.h"
275 : #include "replication/conflict.h"
276 : #include "replication/logicallauncher.h"
277 : #include "replication/logicalproto.h"
278 : #include "replication/logicalrelation.h"
279 : #include "replication/logicalworker.h"
280 : #include "replication/origin.h"
281 : #include "replication/slot.h"
282 : #include "replication/walreceiver.h"
283 : #include "replication/worker_internal.h"
284 : #include "rewrite/rewriteHandler.h"
285 : #include "storage/buffile.h"
286 : #include "storage/ipc.h"
287 : #include "storage/latch.h"
288 : #include "storage/lmgr.h"
289 : #include "storage/procarray.h"
290 : #include "tcop/tcopprot.h"
291 : #include "utils/acl.h"
292 : #include "utils/guc.h"
293 : #include "utils/inval.h"
294 : #include "utils/lsyscache.h"
295 : #include "utils/memutils.h"
296 : #include "utils/pg_lsn.h"
297 : #include "utils/rel.h"
298 : #include "utils/rls.h"
299 : #include "utils/snapmgr.h"
300 : #include "utils/syscache.h"
301 : #include "utils/usercontext.h"
302 : #include "utils/wait_event.h"
303 :
304 : #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
305 :
306 : typedef struct FlushPosition
307 : {
308 : dlist_node node;
309 : XLogRecPtr local_end;
310 : XLogRecPtr remote_end;
311 : } FlushPosition;
312 :
313 : static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
314 :
315 : typedef struct ApplyExecutionData
316 : {
317 : EState *estate; /* executor state, used to track resources */
318 :
319 : LogicalRepRelMapEntry *targetRel; /* replication target rel */
320 : ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
321 :
322 : /* These fields are used when the target relation is partitioned: */
323 : ModifyTableState *mtstate; /* dummy ModifyTable state */
324 : PartitionTupleRouting *proute; /* partition routing info */
325 : } ApplyExecutionData;
326 :
327 : /* Struct for saving and restoring apply errcontext information */
328 : typedef struct ApplyErrorCallbackArg
329 : {
330 : LogicalRepMsgType command; /* 0 if invalid */
331 : LogicalRepRelMapEntry *rel;
332 :
333 : /* Remote node information */
334 : int remote_attnum; /* -1 if invalid */
335 : TransactionId remote_xid;
336 : XLogRecPtr finish_lsn;
337 : char *origin_name;
338 : } ApplyErrorCallbackArg;
339 :
340 : /*
341 : * The action to be taken for the changes in the transaction.
342 : *
343 : * TRANS_LEADER_APPLY:
344 : * This action means that we are in the leader apply worker or table sync
345 : * worker. The changes of the transaction are either directly applied or
346 : * are read from temporary files (for streaming transactions) and then
347 : * applied by the worker.
348 : *
349 : * TRANS_LEADER_SERIALIZE:
350 : * This action means that we are in the leader apply worker or table sync
351 : * worker. Changes are written to temporary files and then applied when the
352 : * final commit arrives.
353 : *
354 : * TRANS_LEADER_SEND_TO_PARALLEL:
355 : * This action means that we are in the leader apply worker and need to send
356 : * the changes to the parallel apply worker.
357 : *
358 : * TRANS_LEADER_PARTIAL_SERIALIZE:
359 : * This action means that we are in the leader apply worker and have sent some
360 : * changes directly to the parallel apply worker and the remaining changes are
361 : * serialized to a file, due to timeout while sending data. The parallel apply
362 : * worker will apply these serialized changes when the final commit arrives.
363 : *
364 : * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
365 : * serializing changes, the leader worker also needs to serialize the
366 : * STREAM_XXX message to a file, and wait for the parallel apply worker to
367 : * finish the transaction when processing the transaction finish command. So
368 : * this new action was introduced to keep the code and logic clear.
369 : *
370 : * TRANS_PARALLEL_APPLY:
371 : * This action means that we are in the parallel apply worker and changes of
372 : * the transaction are applied directly by the worker.
373 : */
374 : typedef enum
375 : {
376 : /* The action for non-streaming transactions. */
377 : TRANS_LEADER_APPLY,
378 :
379 : /* Actions for streaming transactions. */
380 : TRANS_LEADER_SERIALIZE,
381 : TRANS_LEADER_SEND_TO_PARALLEL,
382 : TRANS_LEADER_PARTIAL_SERIALIZE,
383 : TRANS_PARALLEL_APPLY,
384 : } TransApplyAction;
385 :
386 : /*
387 : * The phases involved in advancing the non-removable transaction ID.
388 : *
389 : * See comments atop worker.c for details of the transition between these
390 : * phases.
391 : */
392 : typedef enum
393 : {
394 : RDT_GET_CANDIDATE_XID,
395 : RDT_REQUEST_PUBLISHER_STATUS,
396 : RDT_WAIT_FOR_PUBLISHER_STATUS,
397 : RDT_WAIT_FOR_LOCAL_FLUSH,
398 : RDT_STOP_CONFLICT_INFO_RETENTION,
399 : RDT_RESUME_CONFLICT_INFO_RETENTION,
400 : } RetainDeadTuplesPhase;
401 :
402 : /*
403 : * Critical information for managing phase transitions within the
404 : * RetainDeadTuplesPhase.
405 : */
406 : typedef struct RetainDeadTuplesData
407 : {
408 : RetainDeadTuplesPhase phase; /* current phase */
409 : XLogRecPtr remote_lsn; /* WAL write position on the publisher */
410 :
411 : /*
412 : * Oldest transaction ID that was in the commit phase on the publisher.
413 : * Use FullTransactionId to prevent issues with transaction ID wraparound,
414 : * where a new remote_oldestxid could falsely appear to originate from the
415 : * past and block advancement.
416 : */
417 : FullTransactionId remote_oldestxid;
418 :
419 : /*
420 : * Next transaction ID to be assigned on the publisher. Use
421 : * FullTransactionId for consistency and to allow straightforward
422 : * comparisons with remote_oldestxid.
423 : */
424 : FullTransactionId remote_nextxid;
425 :
426 : TimestampTz reply_time; /* when the publisher responds with status */
427 :
428 : /*
429 : * Publisher transaction ID that must be awaited to complete before
430 : * entering the final phase (RDT_WAIT_FOR_LOCAL_FLUSH). Use
431 : * FullTransactionId for the same reason as remote_nextxid.
432 : */
433 : FullTransactionId remote_wait_for;
434 :
435 : TransactionId candidate_xid; /* candidate for the non-removable
436 : * transaction ID */
437 : TimestampTz flushpos_update_time; /* when the remote flush position was
438 : * updated in final phase
439 : * (RDT_WAIT_FOR_LOCAL_FLUSH) */
440 :
441 : long table_sync_wait_time; /* time spent waiting for table sync
442 : * to finish */
443 :
444 : /*
445 : * The following fields are used to determine the timing for the next
446 : * round of transaction ID advancement.
447 : */
448 : TimestampTz last_recv_time; /* when the last message was received */
449 : TimestampTz candidate_xid_time; /* when the candidate_xid is decided */
450 : int xid_advance_interval; /* how much time (ms) to wait before
451 : * attempting to advance the
452 : * non-removable transaction ID */
453 : } RetainDeadTuplesData;
454 :
455 : /*
456 : * The minimum (100ms) and maximum (3 minutes) intervals for advancing
457 : * non-removable transaction IDs. The maximum interval is a bit arbitrary but
458 : * is sufficient to not cause any undue network traffic.
459 : */
460 : #define MIN_XID_ADVANCE_INTERVAL 100
461 : #define MAX_XID_ADVANCE_INTERVAL 180000
462 :
463 : /* errcontext tracker */
464 : static ApplyErrorCallbackArg apply_error_callback_arg =
465 : {
466 : .command = 0,
467 : .rel = NULL,
468 : .remote_attnum = -1,
469 : .remote_xid = InvalidTransactionId,
470 : .finish_lsn = InvalidXLogRecPtr,
471 : .origin_name = NULL,
472 : };
473 :
474 : ErrorContextCallback *apply_error_context_stack = NULL;
475 :
476 : MemoryContext ApplyMessageContext = NULL;
477 : MemoryContext ApplyContext = NULL;
478 :
479 : /* per stream context for streaming transactions */
480 : static MemoryContext LogicalStreamingContext = NULL;
481 :
482 : WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
483 :
484 : Subscription *MySubscription = NULL;
485 : static bool MySubscriptionValid = false;
486 :
487 : static List *on_commit_wakeup_workers_subids = NIL;
488 :
489 : bool in_remote_transaction = false;
490 : static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
491 :
492 : /* fields valid only when processing streamed transaction */
493 : static bool in_streamed_transaction = false;
494 :
495 : static TransactionId stream_xid = InvalidTransactionId;
496 :
497 : /*
498 : * The number of changes applied by parallel apply worker during one streaming
499 : * block.
500 : */
501 : static uint32 parallel_stream_nchanges = 0;
502 :
503 : /* Are we initializing an apply worker? */
504 : bool InitializingApplyWorker = false;
505 :
506 : /*
507 : * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
508 : * the subscription if the remote transaction's finish LSN matches the subskiplsn.
509 : * Once we start skipping changes, we don't stop it until we skip all changes of
510 : * the transaction even if pg_subscription is updated and MySubscription->skiplsn
511 : * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
512 : * we don't skip receiving and spooling the changes since we decide whether or not
513 : * to skip applying the changes when starting to apply changes. The subskiplsn is
514 : * cleared after successfully skipping the transaction or applying non-empty
515 : * transaction. The latter prevents the mistakenly specified subskiplsn from
516 : * being left. Note that we cannot skip the streaming transactions when using
517 : * parallel apply workers because we cannot get the finish LSN before applying
518 : * the changes. So, we don't start parallel apply worker when finish LSN is set
519 : * by the user.
520 : */
521 : static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
522 : #define is_skipping_changes() (unlikely(XLogRecPtrIsValid(skip_xact_finish_lsn)))
523 :
524 : /* BufFile handle of the current streaming file */
525 : static BufFile *stream_fd = NULL;
526 :
527 : /*
528 : * The remote WAL position that has been applied and flushed locally. We record
529 : * and use this information both while sending feedback to the server and
530 : * advancing oldest_nonremovable_xid.
531 : */
532 : static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
533 :
534 : typedef struct SubXactInfo
535 : {
536 : TransactionId xid; /* XID of the subxact */
537 : int fileno; /* file number in the buffile */
538 : pgoff_t offset; /* offset in the file */
539 : } SubXactInfo;
540 :
541 : /* Sub-transaction data for the current streaming transaction */
542 : typedef struct ApplySubXactData
543 : {
544 : uint32 nsubxacts; /* number of sub-transactions */
545 : uint32 nsubxacts_max; /* current capacity of subxacts */
546 : TransactionId subxact_last; /* xid of the last sub-transaction */
547 : SubXactInfo *subxacts; /* sub-xact offset in changes file */
548 : } ApplySubXactData;
549 :
550 : static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
551 :
552 : static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
553 : static inline void changes_filename(char *path, Oid subid, TransactionId xid);
554 :
555 : /*
556 : * Information about subtransactions of a given toplevel transaction.
557 : */
558 : static void subxact_info_write(Oid subid, TransactionId xid);
559 : static void subxact_info_read(Oid subid, TransactionId xid);
560 : static void subxact_info_add(TransactionId xid);
561 : static inline void cleanup_subxact_info(void);
562 :
563 : /*
564 : * Serialize and deserialize changes for a toplevel transaction.
565 : */
566 : static void stream_open_file(Oid subid, TransactionId xid,
567 : bool first_segment);
568 : static void stream_write_change(char action, StringInfo s);
569 : static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
570 : static void stream_close_file(void);
571 :
572 : static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
573 :
574 : static void maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
575 : bool status_received);
576 : static bool can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data);
577 : static void process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
578 : bool status_received);
579 : static void get_candidate_xid(RetainDeadTuplesData *rdt_data);
580 : static void request_publisher_status(RetainDeadTuplesData *rdt_data);
581 : static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
582 : bool status_received);
583 : static void wait_for_local_flush(RetainDeadTuplesData *rdt_data);
584 : static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
585 : static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
586 : static void resume_conflict_info_retention(RetainDeadTuplesData *rdt_data);
587 : static bool update_retention_status(bool active);
588 : static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data);
589 : static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data,
590 : bool new_xid_found);
591 :
592 : static void apply_worker_exit(void);
593 :
594 : static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
595 : static void apply_handle_insert_internal(ApplyExecutionData *edata,
596 : ResultRelInfo *relinfo,
597 : TupleTableSlot *remoteslot);
598 : static void apply_handle_update_internal(ApplyExecutionData *edata,
599 : ResultRelInfo *relinfo,
600 : TupleTableSlot *remoteslot,
601 : LogicalRepTupleData *newtup,
602 : Oid localindexoid);
603 : static void apply_handle_delete_internal(ApplyExecutionData *edata,
604 : ResultRelInfo *relinfo,
605 : TupleTableSlot *remoteslot,
606 : Oid localindexoid);
607 : static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
608 : LogicalRepRelation *remoterel,
609 : Oid localidxoid,
610 : TupleTableSlot *remoteslot,
611 : TupleTableSlot **localslot);
612 : static bool FindDeletedTupleInLocalRel(Relation localrel,
613 : Oid localidxoid,
614 : TupleTableSlot *remoteslot,
615 : TransactionId *delete_xid,
616 : ReplOriginId *delete_origin,
617 : TimestampTz *delete_time);
618 : static void apply_handle_tuple_routing(ApplyExecutionData *edata,
619 : TupleTableSlot *remoteslot,
620 : LogicalRepTupleData *newtup,
621 : CmdType operation);
622 :
623 : /* Functions for skipping changes */
624 : static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
625 : static void stop_skipping_changes(void);
626 : static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
627 :
628 : /* Functions for apply error callback */
629 : static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
630 : static inline void reset_apply_error_context_info(void);
631 :
632 : static TransApplyAction get_transaction_apply_action(TransactionId xid,
633 : ParallelApplyWorkerInfo **winfo);
634 :
635 : static void set_wal_receiver_timeout(void);
636 :
637 : static void on_exit_clear_xact_state(int code, Datum arg);
638 :
639 : /*
640 : * Form the origin name for the subscription.
641 : *
642 : * This is a common function for tablesync and other workers. Tablesync workers
643 : * must pass a valid relid. Other callers must pass relid = InvalidOid.
644 : *
645 : * Return the name in the supplied buffer.
646 : */
647 : void
648 1495 : ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
649 : char *originname, Size szoriginname)
650 : {
651 1495 : if (OidIsValid(relid))
652 : {
653 : /* Replication origin name for tablesync workers. */
654 812 : snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
655 : }
656 : else
657 : {
658 : /* Replication origin name for non-tablesync workers. */
659 683 : snprintf(originname, szoriginname, "pg_%u", suboid);
660 : }
661 1495 : }
662 :
663 : /*
664 : * Should this worker apply changes for given relation.
665 : *
666 : * This is mainly needed for initial relation data sync as that runs in
667 : * separate worker process running in parallel and we need some way to skip
668 : * changes coming to the leader apply worker during the sync of a table.
669 : *
670 : * Note we need to do smaller or equals comparison for SYNCDONE state because
671 : * it might hold position of end of initial slot consistent point WAL
672 : * record + 1 (ie start of next record) and next record can be COMMIT of
673 : * transaction we are now processing (which is what we set remote_final_lsn
674 : * to in apply_handle_begin).
675 : *
676 : * Note that for streaming transactions that are being applied in the parallel
677 : * apply worker, we disallow applying changes if the target table in the
678 : * subscription is not in the READY state, because we cannot decide whether to
679 : * apply the change as we won't know remote_final_lsn by that time.
680 : *
681 : * We already checked this in pa_can_start() before assigning the
682 : * streaming transaction to the parallel worker, but it also needs to be
683 : * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
684 : * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
685 : * while applying this transaction.
686 : */
687 : static bool
688 168847 : should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
689 : {
690 168847 : switch (MyLogicalRepWorker->type)
691 : {
692 0 : case WORKERTYPE_TABLESYNC:
693 0 : return MyLogicalRepWorker->relid == rel->localreloid;
694 :
695 68368 : case WORKERTYPE_PARALLEL_APPLY:
696 : /* We don't synchronize rel's that are in unknown state. */
697 68368 : if (rel->state != SUBREL_STATE_READY &&
698 0 : rel->state != SUBREL_STATE_UNKNOWN)
699 0 : ereport(ERROR,
700 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
701 : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
702 : MySubscription->name),
703 : errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
704 :
705 68368 : return rel->state == SUBREL_STATE_READY;
706 :
707 100479 : case WORKERTYPE_APPLY:
708 100545 : return (rel->state == SUBREL_STATE_READY ||
709 66 : (rel->state == SUBREL_STATE_SYNCDONE &&
710 9 : rel->statelsn <= remote_final_lsn));
711 :
712 0 : case WORKERTYPE_SEQUENCESYNC:
713 : /* Should never happen. */
714 0 : elog(ERROR, "sequence synchronization worker is not expected to apply changes");
715 : break;
716 :
717 0 : case WORKERTYPE_UNKNOWN:
718 : /* Should never happen. */
719 0 : elog(ERROR, "Unknown worker type");
720 : }
721 :
722 0 : return false; /* dummy for compiler */
723 : }
724 :
725 : /*
726 : * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
727 : *
728 : * Start a transaction, if this is the first step (else we keep using the
729 : * existing transaction).
730 : * Also provide a global snapshot and ensure we run in ApplyMessageContext.
731 : */
732 : static void
733 169315 : begin_replication_step(void)
734 : {
735 169315 : SetCurrentStatementStartTimestamp();
736 :
737 169315 : if (!IsTransactionState())
738 : {
739 1022 : StartTransactionCommand();
740 1022 : maybe_reread_subscription();
741 : }
742 :
743 169312 : PushActiveSnapshot(GetTransactionSnapshot());
744 :
745 169312 : MemoryContextSwitchTo(ApplyMessageContext);
746 169312 : }
747 :
748 : /*
749 : * Finish up one step of a replication transaction.
750 : * Callers of begin_replication_step() must also call this.
751 : *
752 : * We don't close out the transaction here, but we should increment
753 : * the command counter to make the effects of this step visible.
754 : */
755 : static void
756 169231 : end_replication_step(void)
757 : {
758 169231 : PopActiveSnapshot();
759 :
760 169231 : CommandCounterIncrement();
761 169231 : }
762 :
763 : /*
764 : * Handle streamed transactions for both the leader apply worker and the
765 : * parallel apply workers.
766 : *
767 : * In the streaming case (receiving a block of the streamed transaction), for
768 : * serialize mode, simply redirect it to a file for the proper toplevel
769 : * transaction, and for parallel mode, the leader apply worker will send the
770 : * changes to parallel apply workers and the parallel apply worker will define
771 : * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
772 : * messages will be applied by both leader apply worker and parallel apply
773 : * workers).
774 : *
775 : * Returns true for streamed transactions (when the change is either serialized
776 : * to file or sent to parallel apply worker), false otherwise (regular mode or
777 : * needs to be processed by parallel apply worker).
778 : *
779 : * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
780 : * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
781 : * to a parallel apply worker.
782 : */
783 : static bool
784 345237 : handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
785 : {
786 : TransactionId current_xid;
787 : ParallelApplyWorkerInfo *winfo;
788 : TransApplyAction apply_action;
789 : StringInfoData original_msg;
790 :
791 345237 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
792 :
793 : /* not in streaming mode */
794 345237 : if (apply_action == TRANS_LEADER_APPLY)
795 100928 : return false;
796 :
797 : Assert(TransactionIdIsValid(stream_xid));
798 :
799 : /*
800 : * The parallel apply worker needs the xid in this message to decide
801 : * whether to define a savepoint, so save the original message that has
802 : * not moved the cursor after the xid. We will serialize this message to a
803 : * file in PARTIAL_SERIALIZE mode.
804 : */
805 244309 : original_msg = *s;
806 :
807 : /*
808 : * We should have received XID of the subxact as the first part of the
809 : * message, so extract it.
810 : */
811 244309 : current_xid = pq_getmsgint(s, 4);
812 :
813 244309 : if (!TransactionIdIsValid(current_xid))
814 0 : ereport(ERROR,
815 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
816 : errmsg_internal("invalid transaction ID in streamed replication transaction")));
817 :
818 244309 : switch (apply_action)
819 : {
820 102512 : case TRANS_LEADER_SERIALIZE:
821 : Assert(stream_fd);
822 :
823 : /* Add the new subxact to the array (unless already there). */
824 102512 : subxact_info_add(current_xid);
825 :
826 : /* Write the change to the current file */
827 102512 : stream_write_change(action, s);
828 102512 : return true;
829 :
830 68392 : case TRANS_LEADER_SEND_TO_PARALLEL:
831 : Assert(winfo);
832 :
833 : /*
834 : * XXX The publisher side doesn't always send relation/type update
835 : * messages after the streaming transaction, so also update the
836 : * relation/type in leader apply worker. See function
837 : * cleanup_rel_sync_cache.
838 : */
839 68392 : if (pa_send_data(winfo, s->len, s->data))
840 68392 : return (action != LOGICAL_REP_MSG_RELATION &&
841 : action != LOGICAL_REP_MSG_TYPE);
842 :
843 : /*
844 : * Switch to serialize mode when we are not able to send the
845 : * change to parallel apply worker.
846 : */
847 0 : pa_switch_to_partial_serialize(winfo, false);
848 :
849 : pg_fallthrough;
850 5006 : case TRANS_LEADER_PARTIAL_SERIALIZE:
851 5006 : stream_write_change(action, &original_msg);
852 :
853 : /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
854 5006 : return (action != LOGICAL_REP_MSG_RELATION &&
855 : action != LOGICAL_REP_MSG_TYPE);
856 :
857 68399 : case TRANS_PARALLEL_APPLY:
858 68399 : parallel_stream_nchanges += 1;
859 :
860 : /* Define a savepoint for a subxact if needed. */
861 68399 : pa_start_subtrans(current_xid, stream_xid);
862 68399 : return false;
863 :
864 0 : default:
865 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
866 : return false; /* silence compiler warning */
867 : }
868 : }
869 :
870 : /*
871 : * Executor state preparation for evaluation of constraint expressions,
872 : * indexes and triggers for the specified relation.
873 : *
874 : * Note that the caller must open and close any indexes to be updated.
875 : */
876 : static ApplyExecutionData *
877 168760 : create_edata_for_relation(LogicalRepRelMapEntry *rel)
878 : {
879 : ApplyExecutionData *edata;
880 : EState *estate;
881 : RangeTblEntry *rte;
882 168760 : List *perminfos = NIL;
883 : ResultRelInfo *resultRelInfo;
884 :
885 168760 : edata = palloc0_object(ApplyExecutionData);
886 168760 : edata->targetRel = rel;
887 :
888 168760 : edata->estate = estate = CreateExecutorState();
889 :
890 168760 : rte = makeNode(RangeTblEntry);
891 168760 : rte->rtekind = RTE_RELATION;
892 168760 : rte->relid = RelationGetRelid(rel->localrel);
893 168760 : rte->relkind = rel->localrel->rd_rel->relkind;
894 168760 : rte->rellockmode = AccessShareLock;
895 :
896 168760 : addRTEPermissionInfo(&perminfos, rte);
897 :
898 168760 : ExecInitRangeTable(estate, list_make1(rte), perminfos,
899 : bms_make_singleton(1));
900 :
901 168760 : edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
902 :
903 : /*
904 : * Use Relation opened by logicalrep_rel_open() instead of opening it
905 : * again.
906 : */
907 168760 : InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
908 :
909 : /*
910 : * We put the ResultRelInfo in the es_opened_result_relations list, even
911 : * though we don't populate the es_result_relations array. That's a bit
912 : * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
913 : *
914 : * ExecOpenIndices() is not called here either, each execution path doing
915 : * an apply operation being responsible for that.
916 : */
917 168760 : estate->es_opened_result_relations =
918 168760 : lappend(estate->es_opened_result_relations, resultRelInfo);
919 :
920 168760 : estate->es_output_cid = GetCurrentCommandId(true);
921 :
922 : /* Prepare to catch AFTER triggers. */
923 168760 : AfterTriggerBeginQuery();
924 :
925 : /* other fields of edata remain NULL for now */
926 :
927 168760 : return edata;
928 : }
929 :
930 : /*
931 : * Finish any operations related to the executor state created by
932 : * create_edata_for_relation().
933 : */
934 : static void
935 168694 : finish_edata(ApplyExecutionData *edata)
936 : {
937 168694 : EState *estate = edata->estate;
938 :
939 : /* Handle any queued AFTER triggers. */
940 168694 : AfterTriggerEndQuery(estate);
941 :
942 : /* Shut down tuple routing, if any was done. */
943 168694 : if (edata->proute)
944 74 : ExecCleanupTupleRouting(edata->mtstate, edata->proute);
945 :
946 : /*
947 : * Cleanup. It might seem that we should call ExecCloseResultRelations()
948 : * here, but we intentionally don't. It would close the rel we added to
949 : * es_opened_result_relations above, which is wrong because we took no
950 : * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
951 : * any other relations opened during execution.
952 : */
953 168694 : ExecResetTupleTable(estate->es_tupleTable, false);
954 168694 : FreeExecutorState(estate);
955 168694 : pfree(edata);
956 168694 : }
957 :
958 : /*
959 : * Executes default values for columns for which we can't map to remote
960 : * relation columns.
961 : *
962 : * This allows us to support tables which have more columns on the downstream
963 : * than on the upstream.
964 : */
965 : static void
966 96485 : slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
967 : TupleTableSlot *slot)
968 : {
969 96485 : TupleDesc desc = RelationGetDescr(rel->localrel);
970 96485 : int num_phys_attrs = desc->natts;
971 : int i;
972 : int attnum,
973 96485 : num_defaults = 0;
974 : int *defmap;
975 : ExprState **defexprs;
976 : ExprContext *econtext;
977 :
978 96485 : econtext = GetPerTupleExprContext(estate);
979 :
980 : /* We got all the data via replication, no need to evaluate anything. */
981 96485 : if (num_phys_attrs == rel->remoterel.natts)
982 56340 : return;
983 :
984 40145 : defmap = palloc_array(int, num_phys_attrs);
985 40145 : defexprs = palloc_array(ExprState *, num_phys_attrs);
986 :
987 : Assert(rel->attrmap->maplen == num_phys_attrs);
988 210663 : for (attnum = 0; attnum < num_phys_attrs; attnum++)
989 : {
990 170518 : CompactAttribute *cattr = TupleDescCompactAttr(desc, attnum);
991 : Expr *defexpr;
992 :
993 170518 : if (cattr->attisdropped || cattr->attgenerated)
994 9 : continue;
995 :
996 170509 : if (rel->attrmap->attnums[attnum] >= 0)
997 92268 : continue;
998 :
999 78241 : defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
1000 :
1001 78241 : if (defexpr != NULL)
1002 : {
1003 : /* Run the expression through planner */
1004 70131 : defexpr = expression_planner(defexpr);
1005 :
1006 : /* Initialize executable expression in copycontext */
1007 70131 : defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
1008 70131 : defmap[num_defaults] = attnum;
1009 70131 : num_defaults++;
1010 : }
1011 : }
1012 :
1013 110276 : for (i = 0; i < num_defaults; i++)
1014 70131 : slot->tts_values[defmap[i]] =
1015 70131 : ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
1016 : }
1017 :
1018 : /*
1019 : * Store tuple data into slot.
1020 : *
1021 : * Incoming data can be either text or binary format.
1022 : */
1023 : static void
1024 168774 : slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
1025 : LogicalRepTupleData *tupleData)
1026 : {
1027 168774 : int natts = slot->tts_tupleDescriptor->natts;
1028 : int i;
1029 :
1030 168774 : ExecClearTuple(slot);
1031 :
1032 : /* Call the "in" function for each non-dropped, non-null attribute */
1033 : Assert(natts == rel->attrmap->maplen);
1034 699261 : for (i = 0; i < natts; i++)
1035 : {
1036 530487 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
1037 530487 : int remoteattnum = rel->attrmap->attnums[i];
1038 :
1039 530487 : if (!att->attisdropped && remoteattnum >= 0)
1040 323570 : {
1041 323570 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
1042 :
1043 : Assert(remoteattnum < tupleData->ncols);
1044 :
1045 : /* Set attnum for error callback */
1046 323570 : apply_error_callback_arg.remote_attnum = remoteattnum;
1047 :
1048 323570 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
1049 : {
1050 : Oid typinput;
1051 : Oid typioparam;
1052 :
1053 163254 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1054 326508 : slot->tts_values[i] =
1055 163254 : OidInputFunctionCall(typinput, colvalue->data,
1056 : typioparam, att->atttypmod);
1057 163254 : slot->tts_isnull[i] = false;
1058 : }
1059 160316 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1060 : {
1061 : Oid typreceive;
1062 : Oid typioparam;
1063 :
1064 : /*
1065 : * In some code paths we may be asked to re-parse the same
1066 : * tuple data. Reset the StringInfo's cursor so that works.
1067 : */
1068 109980 : colvalue->cursor = 0;
1069 :
1070 109980 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1071 219960 : slot->tts_values[i] =
1072 109980 : OidReceiveFunctionCall(typreceive, colvalue,
1073 : typioparam, att->atttypmod);
1074 :
1075 : /* Trouble if it didn't eat the whole buffer */
1076 109980 : if (colvalue->cursor != colvalue->len)
1077 0 : ereport(ERROR,
1078 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
1079 : errmsg("incorrect binary data format in logical replication column %d",
1080 : remoteattnum + 1)));
1081 109980 : slot->tts_isnull[i] = false;
1082 : }
1083 : else
1084 : {
1085 : /*
1086 : * NULL value from remote. (We don't expect to see
1087 : * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
1088 : * NULL.)
1089 : */
1090 50336 : slot->tts_values[i] = (Datum) 0;
1091 50336 : slot->tts_isnull[i] = true;
1092 : }
1093 :
1094 : /* Reset attnum for error callback */
1095 323570 : apply_error_callback_arg.remote_attnum = -1;
1096 : }
1097 : else
1098 : {
1099 : /*
1100 : * We assign NULL to dropped attributes and missing values
1101 : * (missing values should be later filled using
1102 : * slot_fill_defaults).
1103 : */
1104 206917 : slot->tts_values[i] = (Datum) 0;
1105 206917 : slot->tts_isnull[i] = true;
1106 : }
1107 : }
1108 :
1109 168774 : ExecStoreVirtualTuple(slot);
1110 168774 : }
1111 :
1112 : /*
1113 : * Replace updated columns with data from the LogicalRepTupleData struct.
1114 : * This is somewhat similar to heap_modify_tuple but also calls the type
1115 : * input functions on the user data.
1116 : *
1117 : * "slot" is filled with a copy of the tuple in "srcslot", replacing
1118 : * columns provided in "tupleData" and leaving others as-is.
1119 : *
1120 : * Caution: unreplaced pass-by-ref columns in "slot" will point into the
1121 : * storage for "srcslot". This is OK for current usage, but someday we may
1122 : * need to materialize "slot" at the end to make it independent of "srcslot".
1123 : */
1124 : static void
1125 31930 : slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
1126 : LogicalRepRelMapEntry *rel,
1127 : LogicalRepTupleData *tupleData)
1128 : {
1129 31930 : int natts = slot->tts_tupleDescriptor->natts;
1130 : int i;
1131 :
1132 : /* We'll fill "slot" with a virtual tuple, so we must start with ... */
1133 31930 : ExecClearTuple(slot);
1134 :
1135 : /*
1136 : * Copy all the column data from srcslot, so that we'll have valid values
1137 : * for unreplaced columns.
1138 : */
1139 : Assert(natts == srcslot->tts_tupleDescriptor->natts);
1140 31930 : slot_getallattrs(srcslot);
1141 31930 : memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
1142 31930 : memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
1143 :
1144 : /* Call the "in" function for each replaced attribute */
1145 : Assert(natts == rel->attrmap->maplen);
1146 159304 : for (i = 0; i < natts; i++)
1147 : {
1148 127374 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
1149 127374 : int remoteattnum = rel->attrmap->attnums[i];
1150 :
1151 127374 : if (remoteattnum < 0)
1152 58519 : continue;
1153 :
1154 : Assert(remoteattnum < tupleData->ncols);
1155 :
1156 68855 : if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
1157 : {
1158 68855 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
1159 :
1160 : /* Set attnum for error callback */
1161 68855 : apply_error_callback_arg.remote_attnum = remoteattnum;
1162 :
1163 68855 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
1164 : {
1165 : Oid typinput;
1166 : Oid typioparam;
1167 :
1168 25451 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1169 50902 : slot->tts_values[i] =
1170 25451 : OidInputFunctionCall(typinput, colvalue->data,
1171 : typioparam, att->atttypmod);
1172 25451 : slot->tts_isnull[i] = false;
1173 : }
1174 43404 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1175 : {
1176 : Oid typreceive;
1177 : Oid typioparam;
1178 :
1179 : /*
1180 : * In some code paths we may be asked to re-parse the same
1181 : * tuple data. Reset the StringInfo's cursor so that works.
1182 : */
1183 43356 : colvalue->cursor = 0;
1184 :
1185 43356 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1186 86712 : slot->tts_values[i] =
1187 43356 : OidReceiveFunctionCall(typreceive, colvalue,
1188 : typioparam, att->atttypmod);
1189 :
1190 : /* Trouble if it didn't eat the whole buffer */
1191 43356 : if (colvalue->cursor != colvalue->len)
1192 0 : ereport(ERROR,
1193 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
1194 : errmsg("incorrect binary data format in logical replication column %d",
1195 : remoteattnum + 1)));
1196 43356 : slot->tts_isnull[i] = false;
1197 : }
1198 : else
1199 : {
1200 : /* must be LOGICALREP_COLUMN_NULL */
1201 48 : slot->tts_values[i] = (Datum) 0;
1202 48 : slot->tts_isnull[i] = true;
1203 : }
1204 :
1205 : /* Reset attnum for error callback */
1206 68855 : apply_error_callback_arg.remote_attnum = -1;
1207 : }
1208 : }
1209 :
1210 : /* And finally, declare that "slot" contains a valid virtual tuple */
1211 31930 : ExecStoreVirtualTuple(slot);
1212 31930 : }
1213 :
1214 : /*
1215 : * Handle BEGIN message.
1216 : */
1217 : static void
1218 542 : apply_handle_begin(StringInfo s)
1219 : {
1220 : LogicalRepBeginData begin_data;
1221 :
1222 : /* There must not be an active streaming transaction. */
1223 : Assert(!TransactionIdIsValid(stream_xid));
1224 :
1225 542 : logicalrep_read_begin(s, &begin_data);
1226 542 : set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
1227 :
1228 542 : remote_final_lsn = begin_data.final_lsn;
1229 :
1230 542 : maybe_start_skipping_changes(begin_data.final_lsn);
1231 :
1232 542 : in_remote_transaction = true;
1233 :
1234 542 : pgstat_report_activity(STATE_RUNNING, NULL);
1235 542 : }
1236 :
1237 : /*
1238 : * Handle COMMIT message.
1239 : *
1240 : * TODO, support tracking of multiple origins
1241 : */
1242 : static void
1243 460 : apply_handle_commit(StringInfo s)
1244 : {
1245 : LogicalRepCommitData commit_data;
1246 :
1247 460 : logicalrep_read_commit(s, &commit_data);
1248 :
1249 460 : if (commit_data.commit_lsn != remote_final_lsn)
1250 0 : ereport(ERROR,
1251 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1252 : errmsg_internal("incorrect commit LSN %X/%08X in commit message (expected %X/%08X)",
1253 : LSN_FORMAT_ARGS(commit_data.commit_lsn),
1254 : LSN_FORMAT_ARGS(remote_final_lsn))));
1255 :
1256 460 : apply_handle_commit_internal(&commit_data);
1257 :
1258 : /*
1259 : * Process any tables that are being synchronized in parallel, as well as
1260 : * any newly added tables or sequences.
1261 : */
1262 460 : ProcessSyncingRelations(commit_data.end_lsn);
1263 :
1264 460 : pgstat_report_activity(STATE_IDLE, NULL);
1265 460 : reset_apply_error_context_info();
1266 460 : }
1267 :
1268 : /*
1269 : * Handle BEGIN PREPARE message.
1270 : */
1271 : static void
1272 17 : apply_handle_begin_prepare(StringInfo s)
1273 : {
1274 : LogicalRepPreparedTxnData begin_data;
1275 :
1276 : /* Tablesync should never receive prepare. */
1277 17 : if (am_tablesync_worker())
1278 0 : ereport(ERROR,
1279 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1280 : errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1281 :
1282 : /* There must not be an active streaming transaction. */
1283 : Assert(!TransactionIdIsValid(stream_xid));
1284 :
1285 17 : logicalrep_read_begin_prepare(s, &begin_data);
1286 17 : set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
1287 :
1288 17 : remote_final_lsn = begin_data.prepare_lsn;
1289 :
1290 17 : maybe_start_skipping_changes(begin_data.prepare_lsn);
1291 :
1292 17 : in_remote_transaction = true;
1293 :
1294 17 : pgstat_report_activity(STATE_RUNNING, NULL);
1295 17 : }
1296 :
1297 : /*
1298 : * Common function to prepare the GID.
1299 : */
1300 : static void
1301 27 : apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
1302 : {
1303 : char gid[GIDSIZE];
1304 :
1305 : /*
1306 : * Compute unique GID for two_phase transactions. We don't use GID of
1307 : * prepared transaction sent by server as that can lead to deadlock when
1308 : * we have multiple subscriptions from same node point to publications on
1309 : * the same node. See comments atop worker.c
1310 : */
1311 27 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
1312 : gid, sizeof(gid));
1313 :
1314 : /*
1315 : * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1316 : * called within the PrepareTransactionBlock below.
1317 : */
1318 27 : if (!IsTransactionBlock())
1319 : {
1320 27 : BeginTransactionBlock();
1321 27 : CommitTransactionCommand(); /* Completes the preceding Begin command. */
1322 : }
1323 :
1324 : /*
1325 : * Update origin state so we can restart streaming from correct position
1326 : * in case of crash.
1327 : */
1328 27 : replorigin_xact_state.origin_lsn = prepare_data->end_lsn;
1329 27 : replorigin_xact_state.origin_timestamp = prepare_data->prepare_time;
1330 :
1331 27 : PrepareTransactionBlock(gid);
1332 27 : }
1333 :
1334 : /*
1335 : * Handle PREPARE message.
1336 : */
1337 : static void
1338 16 : apply_handle_prepare(StringInfo s)
1339 : {
1340 : LogicalRepPreparedTxnData prepare_data;
1341 :
1342 16 : logicalrep_read_prepare(s, &prepare_data);
1343 :
1344 16 : if (prepare_data.prepare_lsn != remote_final_lsn)
1345 0 : ereport(ERROR,
1346 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1347 : errmsg_internal("incorrect prepare LSN %X/%08X in prepare message (expected %X/%08X)",
1348 : LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1349 : LSN_FORMAT_ARGS(remote_final_lsn))));
1350 :
1351 : /*
1352 : * Unlike commit, here, we always prepare the transaction even though no
1353 : * change has happened in this transaction or all changes are skipped. It
1354 : * is done this way because at commit prepared time, we won't know whether
1355 : * we have skipped preparing a transaction because of those reasons.
1356 : *
1357 : * XXX, We can optimize such that at commit prepared time, we first check
1358 : * whether we have prepared the transaction or not but that doesn't seem
1359 : * worthwhile because such cases shouldn't be common.
1360 : */
1361 16 : begin_replication_step();
1362 :
1363 16 : apply_handle_prepare_internal(&prepare_data);
1364 :
1365 16 : end_replication_step();
1366 16 : CommitTransactionCommand();
1367 15 : pgstat_report_stat(false);
1368 :
1369 : /*
1370 : * It is okay not to set the local_end LSN for the prepare because we
1371 : * always flush the prepare record. So, we can send the acknowledgment of
1372 : * the remote_end LSN as soon as prepare is finished.
1373 : *
1374 : * XXX For the sake of consistency with commit, we could have set it with
1375 : * the LSN of prepare but as of now we don't track that value similar to
1376 : * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
1377 : * it.
1378 : */
1379 15 : store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
1380 :
1381 15 : in_remote_transaction = false;
1382 :
1383 : /*
1384 : * Process any tables that are being synchronized in parallel, as well as
1385 : * any newly added tables or sequences.
1386 : */
1387 15 : ProcessSyncingRelations(prepare_data.end_lsn);
1388 :
1389 : /*
1390 : * Since we have already prepared the transaction, in a case where the
1391 : * server crashes before clearing the subskiplsn, it will be left but the
1392 : * transaction won't be resent. But that's okay because it's a rare case
1393 : * and the subskiplsn will be cleared when finishing the next transaction.
1394 : */
1395 15 : stop_skipping_changes();
1396 15 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
1397 :
1398 15 : pgstat_report_activity(STATE_IDLE, NULL);
1399 15 : reset_apply_error_context_info();
1400 15 : }
1401 :
1402 : /*
1403 : * Handle a COMMIT PREPARED of a previously PREPARED transaction.
1404 : *
1405 : * Note that we don't need to wait here if the transaction was prepared in a
1406 : * parallel apply worker. In that case, we have already waited for the prepare
1407 : * to finish in apply_handle_stream_prepare() which will ensure all the
1408 : * operations in that transaction have happened in the subscriber, so no
1409 : * concurrent transaction can cause deadlock or transaction dependency issues.
1410 : */
1411 : static void
1412 22 : apply_handle_commit_prepared(StringInfo s)
1413 : {
1414 : LogicalRepCommitPreparedTxnData prepare_data;
1415 : char gid[GIDSIZE];
1416 :
1417 22 : logicalrep_read_commit_prepared(s, &prepare_data);
1418 22 : set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
1419 :
1420 : /* Compute GID for two_phase transactions. */
1421 22 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
1422 : gid, sizeof(gid));
1423 :
1424 : /* There is no transaction when COMMIT PREPARED is called */
1425 22 : begin_replication_step();
1426 :
1427 : /*
1428 : * Update origin state so we can restart streaming from correct position
1429 : * in case of crash.
1430 : */
1431 22 : replorigin_xact_state.origin_lsn = prepare_data.end_lsn;
1432 22 : replorigin_xact_state.origin_timestamp = prepare_data.commit_time;
1433 :
1434 22 : FinishPreparedTransaction(gid, true);
1435 22 : end_replication_step();
1436 22 : CommitTransactionCommand();
1437 22 : pgstat_report_stat(false);
1438 :
1439 22 : store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
1440 22 : in_remote_transaction = false;
1441 :
1442 : /*
1443 : * Process any tables that are being synchronized in parallel, as well as
1444 : * any newly added tables or sequences.
1445 : */
1446 22 : ProcessSyncingRelations(prepare_data.end_lsn);
1447 :
1448 22 : clear_subscription_skip_lsn(prepare_data.end_lsn);
1449 :
1450 22 : pgstat_report_activity(STATE_IDLE, NULL);
1451 22 : reset_apply_error_context_info();
1452 22 : }
1453 :
1454 : /*
1455 : * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
1456 : *
1457 : * Note that we don't need to wait here if the transaction was prepared in a
1458 : * parallel apply worker. In that case, we have already waited for the prepare
1459 : * to finish in apply_handle_stream_prepare() which will ensure all the
1460 : * operations in that transaction have happened in the subscriber, so no
1461 : * concurrent transaction can cause deadlock or transaction dependency issues.
1462 : */
1463 : static void
1464 5 : apply_handle_rollback_prepared(StringInfo s)
1465 : {
1466 : LogicalRepRollbackPreparedTxnData rollback_data;
1467 : char gid[GIDSIZE];
1468 :
1469 5 : logicalrep_read_rollback_prepared(s, &rollback_data);
1470 5 : set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1471 :
1472 : /* Compute GID for two_phase transactions. */
1473 5 : TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
1474 : gid, sizeof(gid));
1475 :
1476 : /*
1477 : * It is possible that we haven't received prepare because it occurred
1478 : * before walsender reached a consistent point or the two_phase was still
1479 : * not enabled by that time, so in such cases, we need to skip rollback
1480 : * prepared.
1481 : */
1482 5 : if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1483 : rollback_data.prepare_time))
1484 : {
1485 : /*
1486 : * Update origin state so we can restart streaming from correct
1487 : * position in case of crash.
1488 : */
1489 5 : replorigin_xact_state.origin_lsn = rollback_data.rollback_end_lsn;
1490 5 : replorigin_xact_state.origin_timestamp = rollback_data.rollback_time;
1491 :
1492 : /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1493 5 : begin_replication_step();
1494 5 : FinishPreparedTransaction(gid, false);
1495 5 : end_replication_step();
1496 5 : CommitTransactionCommand();
1497 :
1498 5 : clear_subscription_skip_lsn(rollback_data.rollback_end_lsn);
1499 : }
1500 :
1501 5 : pgstat_report_stat(false);
1502 :
1503 : /*
1504 : * It is okay not to set the local_end LSN for the rollback of prepared
1505 : * transaction because we always flush the WAL record for it. See
1506 : * apply_handle_prepare.
1507 : */
1508 5 : store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr);
1509 5 : in_remote_transaction = false;
1510 :
1511 : /*
1512 : * Process any tables that are being synchronized in parallel, as well as
1513 : * any newly added tables or sequences.
1514 : */
1515 5 : ProcessSyncingRelations(rollback_data.rollback_end_lsn);
1516 :
1517 5 : pgstat_report_activity(STATE_IDLE, NULL);
1518 5 : reset_apply_error_context_info();
1519 5 : }
1520 :
1521 : /*
1522 : * Handle STREAM PREPARE.
1523 : */
1524 : static void
1525 17 : apply_handle_stream_prepare(StringInfo s)
1526 : {
1527 : LogicalRepPreparedTxnData prepare_data;
1528 : ParallelApplyWorkerInfo *winfo;
1529 : TransApplyAction apply_action;
1530 :
1531 : /* Save the message before it is consumed. */
1532 17 : StringInfoData original_msg = *s;
1533 :
1534 17 : if (in_streamed_transaction)
1535 0 : ereport(ERROR,
1536 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1537 : errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1538 :
1539 : /* Tablesync should never receive prepare. */
1540 17 : if (am_tablesync_worker())
1541 0 : ereport(ERROR,
1542 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1543 : errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1544 :
1545 17 : logicalrep_read_stream_prepare(s, &prepare_data);
1546 17 : set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1547 :
1548 17 : apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
1549 :
1550 17 : switch (apply_action)
1551 : {
1552 5 : case TRANS_LEADER_APPLY:
1553 :
1554 : /*
1555 : * The transaction has been serialized to file, so replay all the
1556 : * spooled operations.
1557 : */
1558 5 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset,
1559 : prepare_data.xid, prepare_data.prepare_lsn);
1560 :
1561 : /* Mark the transaction as prepared. */
1562 5 : apply_handle_prepare_internal(&prepare_data);
1563 :
1564 5 : CommitTransactionCommand();
1565 :
1566 : /*
1567 : * It is okay not to set the local_end LSN for the prepare because
1568 : * we always flush the prepare record. See apply_handle_prepare.
1569 : */
1570 5 : store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
1571 :
1572 5 : in_remote_transaction = false;
1573 :
1574 : /* Unlink the files with serialized changes and subxact info. */
1575 5 : stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
1576 :
1577 5 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1578 5 : break;
1579 :
1580 5 : case TRANS_LEADER_SEND_TO_PARALLEL:
1581 : Assert(winfo);
1582 :
1583 5 : if (pa_send_data(winfo, s->len, s->data))
1584 : {
1585 : /* Finish processing the streaming transaction. */
1586 5 : pa_xact_finish(winfo, prepare_data.end_lsn);
1587 3 : break;
1588 : }
1589 :
1590 : /*
1591 : * Switch to serialize mode when we are not able to send the
1592 : * change to parallel apply worker.
1593 : */
1594 0 : pa_switch_to_partial_serialize(winfo, true);
1595 :
1596 : pg_fallthrough;
1597 1 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1598 : Assert(winfo);
1599 :
1600 1 : stream_open_and_write_change(prepare_data.xid,
1601 : LOGICAL_REP_MSG_STREAM_PREPARE,
1602 : &original_msg);
1603 :
1604 1 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
1605 :
1606 : /* Finish processing the streaming transaction. */
1607 1 : pa_xact_finish(winfo, prepare_data.end_lsn);
1608 1 : break;
1609 :
1610 6 : case TRANS_PARALLEL_APPLY:
1611 :
1612 : /*
1613 : * If the parallel apply worker is applying spooled messages then
1614 : * close the file before preparing.
1615 : */
1616 6 : if (stream_fd)
1617 1 : stream_close_file();
1618 :
1619 6 : begin_replication_step();
1620 :
1621 : /* Mark the transaction as prepared. */
1622 6 : apply_handle_prepare_internal(&prepare_data);
1623 :
1624 6 : end_replication_step();
1625 :
1626 6 : CommitTransactionCommand();
1627 :
1628 : /*
1629 : * It is okay not to set the local_end LSN for the prepare because
1630 : * we always flush the prepare record. See apply_handle_prepare.
1631 : */
1632 4 : MyParallelShared->last_commit_end = InvalidXLogRecPtr;
1633 :
1634 4 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
1635 4 : pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1636 :
1637 4 : pa_reset_subtrans();
1638 :
1639 4 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1640 4 : break;
1641 :
1642 0 : default:
1643 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1644 : break;
1645 : }
1646 :
1647 13 : pgstat_report_stat(false);
1648 :
1649 : /*
1650 : * Process any tables that are being synchronized in parallel, as well as
1651 : * any newly added tables or sequences.
1652 : */
1653 13 : ProcessSyncingRelations(prepare_data.end_lsn);
1654 :
1655 : /*
1656 : * Similar to prepare case, the subskiplsn could be left in a case of
1657 : * server crash but it's okay. See the comments in apply_handle_prepare().
1658 : */
1659 13 : stop_skipping_changes();
1660 13 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
1661 :
1662 13 : pgstat_report_activity(STATE_IDLE, NULL);
1663 :
1664 13 : reset_apply_error_context_info();
1665 13 : }
1666 :
1667 : /*
1668 : * Handle ORIGIN message.
1669 : *
1670 : * TODO, support tracking of multiple origins
1671 : */
1672 : static void
1673 7 : apply_handle_origin(StringInfo s)
1674 : {
1675 : /*
1676 : * ORIGIN message can only come inside streaming transaction or inside
1677 : * remote transaction and before any actual writes.
1678 : */
1679 7 : if (!in_streamed_transaction &&
1680 10 : (!in_remote_transaction ||
1681 5 : (IsTransactionState() && !am_tablesync_worker())))
1682 0 : ereport(ERROR,
1683 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1684 : errmsg_internal("ORIGIN message sent out of order")));
1685 7 : }
1686 :
1687 : /*
1688 : * Initialize fileset (if not already done).
1689 : *
1690 : * Create a new file when first_segment is true, otherwise open the existing
1691 : * file.
1692 : */
1693 : void
1694 363 : stream_start_internal(TransactionId xid, bool first_segment)
1695 : {
1696 363 : begin_replication_step();
1697 :
1698 : /*
1699 : * Initialize the worker's stream_fileset if we haven't yet. This will be
1700 : * used for the entire duration of the worker so create it in a permanent
1701 : * context. We create this on the very first streaming message from any
1702 : * transaction and then use it for this and other streaming transactions.
1703 : * Now, we could create a fileset at the start of the worker as well but
1704 : * then we won't be sure that it will ever be used.
1705 : */
1706 363 : if (!MyLogicalRepWorker->stream_fileset)
1707 : {
1708 : MemoryContext oldctx;
1709 :
1710 14 : oldctx = MemoryContextSwitchTo(ApplyContext);
1711 :
1712 14 : MyLogicalRepWorker->stream_fileset = palloc_object(FileSet);
1713 14 : FileSetInit(MyLogicalRepWorker->stream_fileset);
1714 :
1715 14 : MemoryContextSwitchTo(oldctx);
1716 : }
1717 :
1718 : /* Open the spool file for this transaction. */
1719 363 : stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
1720 :
1721 : /* If this is not the first segment, open existing subxact file. */
1722 363 : if (!first_segment)
1723 331 : subxact_info_read(MyLogicalRepWorker->subid, xid);
1724 :
1725 363 : end_replication_step();
1726 363 : }
1727 :
1728 : /*
1729 : * Handle STREAM START message.
1730 : */
1731 : static void
1732 845 : apply_handle_stream_start(StringInfo s)
1733 : {
1734 : bool first_segment;
1735 : ParallelApplyWorkerInfo *winfo;
1736 : TransApplyAction apply_action;
1737 :
1738 : /* Save the message before it is consumed. */
1739 845 : StringInfoData original_msg = *s;
1740 :
1741 845 : if (in_streamed_transaction)
1742 0 : ereport(ERROR,
1743 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1744 : errmsg_internal("duplicate STREAM START message")));
1745 :
1746 : /* There must not be an active streaming transaction. */
1747 : Assert(!TransactionIdIsValid(stream_xid));
1748 :
1749 : /* notify handle methods we're processing a remote transaction */
1750 845 : in_streamed_transaction = true;
1751 :
1752 : /* extract XID of the top-level transaction */
1753 845 : stream_xid = logicalrep_read_stream_start(s, &first_segment);
1754 :
1755 845 : if (!TransactionIdIsValid(stream_xid))
1756 0 : ereport(ERROR,
1757 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1758 : errmsg_internal("invalid transaction ID in streamed replication transaction")));
1759 :
1760 845 : set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
1761 :
1762 : /* Try to allocate a worker for the streaming transaction. */
1763 845 : if (first_segment)
1764 88 : pa_allocate_worker(stream_xid);
1765 :
1766 845 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1767 :
1768 845 : switch (apply_action)
1769 : {
1770 343 : case TRANS_LEADER_SERIALIZE:
1771 :
1772 : /*
1773 : * Function stream_start_internal starts a transaction. This
1774 : * transaction will be committed on the stream stop unless it is a
1775 : * tablesync worker in which case it will be committed after
1776 : * processing all the messages. We need this transaction for
1777 : * handling the BufFile, used for serializing the streaming data
1778 : * and subxact info.
1779 : */
1780 343 : stream_start_internal(stream_xid, first_segment);
1781 343 : break;
1782 :
1783 245 : case TRANS_LEADER_SEND_TO_PARALLEL:
1784 : Assert(winfo);
1785 :
1786 : /*
1787 : * Once we start serializing the changes, the parallel apply
1788 : * worker will wait for the leader to release the stream lock
1789 : * until the end of the transaction. So, we don't need to release
1790 : * the lock or increment the stream count in that case.
1791 : */
1792 245 : if (pa_send_data(winfo, s->len, s->data))
1793 : {
1794 : /*
1795 : * Unlock the shared object lock so that the parallel apply
1796 : * worker can continue to receive changes.
1797 : */
1798 241 : if (!first_segment)
1799 215 : pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
1800 :
1801 : /*
1802 : * Increment the number of streaming blocks waiting to be
1803 : * processed by parallel apply worker.
1804 : */
1805 241 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
1806 :
1807 : /* Cache the parallel apply worker for this transaction. */
1808 241 : pa_set_stream_apply_worker(winfo);
1809 241 : break;
1810 : }
1811 :
1812 : /*
1813 : * Switch to serialize mode when we are not able to send the
1814 : * change to parallel apply worker.
1815 : */
1816 4 : pa_switch_to_partial_serialize(winfo, !first_segment);
1817 :
1818 : pg_fallthrough;
1819 15 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1820 : Assert(winfo);
1821 :
1822 : /*
1823 : * Open the spool file unless it was already opened when switching
1824 : * to serialize mode. The transaction started in
1825 : * stream_start_internal will be committed on the stream stop.
1826 : */
1827 15 : if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
1828 11 : stream_start_internal(stream_xid, first_segment);
1829 :
1830 15 : stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg);
1831 :
1832 : /* Cache the parallel apply worker for this transaction. */
1833 15 : pa_set_stream_apply_worker(winfo);
1834 15 : break;
1835 :
1836 246 : case TRANS_PARALLEL_APPLY:
1837 246 : if (first_segment)
1838 : {
1839 : /* Hold the lock until the end of the transaction. */
1840 30 : pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1841 30 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED);
1842 :
1843 : /*
1844 : * Signal the leader apply worker, as it may be waiting for
1845 : * us.
1846 : */
1847 30 : logicalrep_worker_wakeup(WORKERTYPE_APPLY,
1848 30 : MyLogicalRepWorker->subid, InvalidOid);
1849 : }
1850 :
1851 246 : parallel_stream_nchanges = 0;
1852 246 : break;
1853 :
1854 0 : default:
1855 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1856 : break;
1857 : }
1858 :
1859 845 : pgstat_report_activity(STATE_RUNNING, NULL);
1860 845 : }
1861 :
1862 : /*
1863 : * Update the information about subxacts and close the file.
1864 : *
1865 : * This function should be called when the stream_start_internal function has
1866 : * been called.
1867 : */
1868 : void
1869 363 : stream_stop_internal(TransactionId xid)
1870 : {
1871 : /*
1872 : * Serialize information about subxacts for the toplevel transaction, then
1873 : * close the stream messages spool file.
1874 : */
1875 363 : subxact_info_write(MyLogicalRepWorker->subid, xid);
1876 363 : stream_close_file();
1877 :
1878 : /* We must be in a valid transaction state */
1879 : Assert(IsTransactionState());
1880 :
1881 : /* Commit the per-stream transaction */
1882 363 : CommitTransactionCommand();
1883 :
1884 : /* Reset per-stream context */
1885 363 : MemoryContextReset(LogicalStreamingContext);
1886 363 : }
1887 :
1888 : /*
1889 : * Handle STREAM STOP message.
1890 : */
1891 : static void
1892 844 : apply_handle_stream_stop(StringInfo s)
1893 : {
1894 : ParallelApplyWorkerInfo *winfo;
1895 : TransApplyAction apply_action;
1896 :
1897 844 : if (!in_streamed_transaction)
1898 0 : ereport(ERROR,
1899 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1900 : errmsg_internal("STREAM STOP message without STREAM START")));
1901 :
1902 844 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1903 :
1904 844 : switch (apply_action)
1905 : {
1906 343 : case TRANS_LEADER_SERIALIZE:
1907 343 : stream_stop_internal(stream_xid);
1908 343 : break;
1909 :
1910 241 : case TRANS_LEADER_SEND_TO_PARALLEL:
1911 : Assert(winfo);
1912 :
1913 : /*
1914 : * Lock before sending the STREAM_STOP message so that the leader
1915 : * can hold the lock first and the parallel apply worker will wait
1916 : * for leader to release the lock. See Locking Considerations atop
1917 : * applyparallelworker.c.
1918 : */
1919 241 : pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
1920 :
1921 241 : if (pa_send_data(winfo, s->len, s->data))
1922 : {
1923 241 : pa_set_stream_apply_worker(NULL);
1924 241 : break;
1925 : }
1926 :
1927 : /*
1928 : * Switch to serialize mode when we are not able to send the
1929 : * change to parallel apply worker.
1930 : */
1931 0 : pa_switch_to_partial_serialize(winfo, true);
1932 :
1933 : pg_fallthrough;
1934 15 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1935 15 : stream_write_change(LOGICAL_REP_MSG_STREAM_STOP, s);
1936 15 : stream_stop_internal(stream_xid);
1937 15 : pa_set_stream_apply_worker(NULL);
1938 15 : break;
1939 :
1940 245 : case TRANS_PARALLEL_APPLY:
1941 245 : elog(DEBUG1, "applied %u changes in the streaming chunk",
1942 : parallel_stream_nchanges);
1943 :
1944 : /*
1945 : * By the time parallel apply worker is processing the changes in
1946 : * the current streaming block, the leader apply worker may have
1947 : * sent multiple streaming blocks. This can lead to parallel apply
1948 : * worker start waiting even when there are more chunk of streams
1949 : * in the queue. So, try to lock only if there is no message left
1950 : * in the queue. See Locking Considerations atop
1951 : * applyparallelworker.c.
1952 : *
1953 : * Note that here we have a race condition where we can start
1954 : * waiting even when there are pending streaming chunks. This can
1955 : * happen if the leader sends another streaming block and acquires
1956 : * the stream lock again after the parallel apply worker checks
1957 : * that there is no pending streaming block and before it actually
1958 : * starts waiting on a lock. We can handle this case by not
1959 : * allowing the leader to increment the stream block count during
1960 : * the time parallel apply worker acquires the lock but it is not
1961 : * clear whether that is worth the complexity.
1962 : *
1963 : * Now, if this missed chunk contains rollback to savepoint, then
1964 : * there is a risk of deadlock which probably shouldn't happen
1965 : * after restart.
1966 : */
1967 245 : pa_decr_and_wait_stream_block();
1968 243 : break;
1969 :
1970 0 : default:
1971 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1972 : break;
1973 : }
1974 :
1975 842 : in_streamed_transaction = false;
1976 842 : stream_xid = InvalidTransactionId;
1977 :
1978 : /*
1979 : * The parallel apply worker could be in a transaction in which case we
1980 : * need to report the state as STATE_IDLEINTRANSACTION.
1981 : */
1982 842 : if (IsTransactionOrTransactionBlock())
1983 243 : pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL);
1984 : else
1985 599 : pgstat_report_activity(STATE_IDLE, NULL);
1986 :
1987 842 : reset_apply_error_context_info();
1988 842 : }
1989 :
1990 : /*
1991 : * Helper function to handle STREAM ABORT message when the transaction was
1992 : * serialized to file.
1993 : */
1994 : static void
1995 14 : stream_abort_internal(TransactionId xid, TransactionId subxid)
1996 : {
1997 : /*
1998 : * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1999 : * just delete the files with serialized info.
2000 : */
2001 14 : if (xid == subxid)
2002 1 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
2003 : else
2004 : {
2005 : /*
2006 : * OK, so it's a subxact. We need to read the subxact file for the
2007 : * toplevel transaction, determine the offset tracked for the subxact,
2008 : * and truncate the file with changes. We also remove the subxacts
2009 : * with higher offsets (or rather higher XIDs).
2010 : *
2011 : * We intentionally scan the array from the tail, because we're likely
2012 : * aborting a change for the most recent subtransactions.
2013 : *
2014 : * We can't use the binary search here as subxact XIDs won't
2015 : * necessarily arrive in sorted order, consider the case where we have
2016 : * released the savepoint for multiple subtransactions and then
2017 : * performed rollback to savepoint for one of the earlier
2018 : * sub-transaction.
2019 : */
2020 : int64 i;
2021 : int64 subidx;
2022 : BufFile *fd;
2023 13 : bool found = false;
2024 : char path[MAXPGPATH];
2025 :
2026 13 : subidx = -1;
2027 13 : begin_replication_step();
2028 13 : subxact_info_read(MyLogicalRepWorker->subid, xid);
2029 :
2030 15 : for (i = subxact_data.nsubxacts; i > 0; i--)
2031 : {
2032 11 : if (subxact_data.subxacts[i - 1].xid == subxid)
2033 : {
2034 9 : subidx = (i - 1);
2035 9 : found = true;
2036 9 : break;
2037 : }
2038 : }
2039 :
2040 : /*
2041 : * If it's an empty sub-transaction then we will not find the subxid
2042 : * here so just cleanup the subxact info and return.
2043 : */
2044 13 : if (!found)
2045 : {
2046 : /* Cleanup the subxact info */
2047 4 : cleanup_subxact_info();
2048 4 : end_replication_step();
2049 4 : CommitTransactionCommand();
2050 4 : return;
2051 : }
2052 :
2053 : /* open the changes file */
2054 9 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2055 9 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path,
2056 : O_RDWR, false);
2057 :
2058 : /* OK, truncate the file at the right offset */
2059 9 : BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
2060 9 : subxact_data.subxacts[subidx].offset);
2061 9 : BufFileClose(fd);
2062 :
2063 : /* discard the subxacts added later */
2064 9 : subxact_data.nsubxacts = subidx;
2065 :
2066 : /* write the updated subxact list */
2067 9 : subxact_info_write(MyLogicalRepWorker->subid, xid);
2068 :
2069 9 : end_replication_step();
2070 9 : CommitTransactionCommand();
2071 : }
2072 : }
2073 :
2074 : /*
2075 : * Handle STREAM ABORT message.
2076 : */
2077 : static void
2078 38 : apply_handle_stream_abort(StringInfo s)
2079 : {
2080 : TransactionId xid;
2081 : TransactionId subxid;
2082 : LogicalRepStreamAbortData abort_data;
2083 : ParallelApplyWorkerInfo *winfo;
2084 : TransApplyAction apply_action;
2085 :
2086 : /* Save the message before it is consumed. */
2087 38 : StringInfoData original_msg = *s;
2088 : bool toplevel_xact;
2089 :
2090 38 : if (in_streamed_transaction)
2091 0 : ereport(ERROR,
2092 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2093 : errmsg_internal("STREAM ABORT message without STREAM STOP")));
2094 :
2095 : /* We receive abort information only when we can apply in parallel. */
2096 38 : logicalrep_read_stream_abort(s, &abort_data,
2097 38 : MyLogicalRepWorker->parallel_apply);
2098 :
2099 38 : xid = abort_data.xid;
2100 38 : subxid = abort_data.subxid;
2101 38 : toplevel_xact = (xid == subxid);
2102 :
2103 38 : set_apply_error_context_xact(subxid, abort_data.abort_lsn);
2104 :
2105 38 : apply_action = get_transaction_apply_action(xid, &winfo);
2106 :
2107 38 : switch (apply_action)
2108 : {
2109 14 : case TRANS_LEADER_APPLY:
2110 :
2111 : /*
2112 : * We are in the leader apply worker and the transaction has been
2113 : * serialized to file.
2114 : */
2115 14 : stream_abort_internal(xid, subxid);
2116 :
2117 14 : elog(DEBUG1, "finished processing the STREAM ABORT command");
2118 14 : break;
2119 :
2120 10 : case TRANS_LEADER_SEND_TO_PARALLEL:
2121 : Assert(winfo);
2122 :
2123 : /*
2124 : * For the case of aborting the subtransaction, we increment the
2125 : * number of streaming blocks and take the lock again before
2126 : * sending the STREAM_ABORT to ensure that the parallel apply
2127 : * worker will wait on the lock for the next set of changes after
2128 : * processing the STREAM_ABORT message if it is not already
2129 : * waiting for STREAM_STOP message.
2130 : *
2131 : * It is important to perform this locking before sending the
2132 : * STREAM_ABORT message so that the leader can hold the lock first
2133 : * and the parallel apply worker will wait for the leader to
2134 : * release the lock. This is the same as what we do in
2135 : * apply_handle_stream_stop. See Locking Considerations atop
2136 : * applyparallelworker.c.
2137 : */
2138 10 : if (!toplevel_xact)
2139 : {
2140 9 : pa_unlock_stream(xid, AccessExclusiveLock);
2141 9 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
2142 9 : pa_lock_stream(xid, AccessExclusiveLock);
2143 : }
2144 :
2145 10 : if (pa_send_data(winfo, s->len, s->data))
2146 : {
2147 : /*
2148 : * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
2149 : * wait here for the parallel apply worker to finish as that
2150 : * is not required to maintain the commit order and won't have
2151 : * the risk of failures due to transaction dependencies and
2152 : * deadlocks. However, it is possible that before the parallel
2153 : * worker finishes and we clear the worker info, the xid
2154 : * wraparound happens on the upstream and a new transaction
2155 : * with the same xid can appear and that can lead to duplicate
2156 : * entries in ParallelApplyTxnHash. Yet another problem could
2157 : * be that we may have serialized the changes in partial
2158 : * serialize mode and the file containing xact changes may
2159 : * already exist, and after xid wraparound trying to create
2160 : * the file for the same xid can lead to an error. To avoid
2161 : * these problems, we decide to wait for the aborts to finish.
2162 : *
2163 : * Note, it is okay to not update the flush location position
2164 : * for aborts as in worst case that means such a transaction
2165 : * won't be sent again after restart.
2166 : */
2167 10 : if (toplevel_xact)
2168 1 : pa_xact_finish(winfo, InvalidXLogRecPtr);
2169 :
2170 10 : break;
2171 : }
2172 :
2173 : /*
2174 : * Switch to serialize mode when we are not able to send the
2175 : * change to parallel apply worker.
2176 : */
2177 0 : pa_switch_to_partial_serialize(winfo, true);
2178 :
2179 : pg_fallthrough;
2180 2 : case TRANS_LEADER_PARTIAL_SERIALIZE:
2181 : Assert(winfo);
2182 :
2183 : /*
2184 : * Parallel apply worker might have applied some changes, so write
2185 : * the STREAM_ABORT message so that it can rollback the
2186 : * subtransaction if needed.
2187 : */
2188 2 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_ABORT,
2189 : &original_msg);
2190 :
2191 2 : if (toplevel_xact)
2192 : {
2193 1 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
2194 1 : pa_xact_finish(winfo, InvalidXLogRecPtr);
2195 : }
2196 2 : break;
2197 :
2198 12 : case TRANS_PARALLEL_APPLY:
2199 :
2200 : /*
2201 : * If the parallel apply worker is applying spooled messages then
2202 : * close the file before aborting.
2203 : */
2204 12 : if (toplevel_xact && stream_fd)
2205 1 : stream_close_file();
2206 :
2207 12 : pa_stream_abort(&abort_data);
2208 :
2209 : /*
2210 : * We need to wait after processing rollback to savepoint for the
2211 : * next set of changes.
2212 : *
2213 : * We have a race condition here due to which we can start waiting
2214 : * here when there are more chunk of streams in the queue. See
2215 : * apply_handle_stream_stop.
2216 : */
2217 12 : if (!toplevel_xact)
2218 10 : pa_decr_and_wait_stream_block();
2219 :
2220 12 : elog(DEBUG1, "finished processing the STREAM ABORT command");
2221 12 : break;
2222 :
2223 0 : default:
2224 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2225 : break;
2226 : }
2227 :
2228 38 : reset_apply_error_context_info();
2229 38 : }
2230 :
2231 : /*
2232 : * Ensure that the passed location is fileset's end.
2233 : */
2234 : static void
2235 4 : ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
2236 : pgoff_t offset)
2237 : {
2238 : char path[MAXPGPATH];
2239 : BufFile *fd;
2240 : int last_fileno;
2241 : pgoff_t last_offset;
2242 :
2243 : Assert(!IsTransactionState());
2244 :
2245 4 : begin_replication_step();
2246 :
2247 4 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2248 :
2249 4 : fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2250 :
2251 4 : BufFileSeek(fd, 0, 0, SEEK_END);
2252 4 : BufFileTell(fd, &last_fileno, &last_offset);
2253 :
2254 4 : BufFileClose(fd);
2255 :
2256 4 : end_replication_step();
2257 :
2258 4 : if (last_fileno != fileno || last_offset != offset)
2259 0 : elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2260 : path);
2261 4 : }
2262 :
2263 : /*
2264 : * Common spoolfile processing.
2265 : */
2266 : void
2267 31 : apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
2268 : XLogRecPtr lsn)
2269 : {
2270 : int nchanges;
2271 : char path[MAXPGPATH];
2272 31 : char *buffer = NULL;
2273 : MemoryContext oldcxt;
2274 : ResourceOwner oldowner;
2275 : int fileno;
2276 : pgoff_t offset;
2277 :
2278 31 : if (!am_parallel_apply_worker())
2279 27 : maybe_start_skipping_changes(lsn);
2280 :
2281 : /* Make sure we have an open transaction */
2282 31 : begin_replication_step();
2283 :
2284 : /*
2285 : * Allocate file handle and memory required to process all the messages in
2286 : * TopTransactionContext to avoid them getting reset after each message is
2287 : * processed.
2288 : */
2289 31 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
2290 :
2291 : /* Open the spool file for the committed/prepared transaction */
2292 31 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2293 31 : elog(DEBUG1, "replaying changes from file \"%s\"", path);
2294 :
2295 : /*
2296 : * Make sure the file is owned by the toplevel transaction so that the
2297 : * file will not be accidentally closed when aborting a subtransaction.
2298 : */
2299 31 : oldowner = CurrentResourceOwner;
2300 31 : CurrentResourceOwner = TopTransactionResourceOwner;
2301 :
2302 31 : stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2303 :
2304 31 : CurrentResourceOwner = oldowner;
2305 :
2306 31 : buffer = palloc(BLCKSZ);
2307 :
2308 31 : MemoryContextSwitchTo(oldcxt);
2309 :
2310 31 : remote_final_lsn = lsn;
2311 :
2312 : /*
2313 : * Make sure the handle apply_dispatch methods are aware we're in a remote
2314 : * transaction.
2315 : */
2316 31 : in_remote_transaction = true;
2317 31 : pgstat_report_activity(STATE_RUNNING, NULL);
2318 :
2319 31 : end_replication_step();
2320 :
2321 : /*
2322 : * Read the entries one by one and pass them through the same logic as in
2323 : * apply_dispatch.
2324 : */
2325 31 : nchanges = 0;
2326 : while (true)
2327 88469 : {
2328 : StringInfoData s2;
2329 : size_t nbytes;
2330 : int len;
2331 :
2332 88500 : CHECK_FOR_INTERRUPTS();
2333 :
2334 : /* read length of the on-disk record */
2335 88500 : nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2336 :
2337 : /* have we reached end of the file? */
2338 88500 : if (nbytes == 0)
2339 26 : break;
2340 :
2341 : /* do we have a correct length? */
2342 88474 : if (len <= 0)
2343 0 : elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2344 : len, path);
2345 :
2346 : /* make sure we have sufficiently large buffer */
2347 88474 : buffer = repalloc(buffer, len);
2348 :
2349 : /* and finally read the data into the buffer */
2350 88474 : BufFileReadExact(stream_fd, buffer, len);
2351 :
2352 88474 : BufFileTell(stream_fd, &fileno, &offset);
2353 :
2354 : /* init a stringinfo using the buffer and call apply_dispatch */
2355 88474 : initReadOnlyStringInfo(&s2, buffer, len);
2356 :
2357 : /* Ensure we are reading the data into our memory context. */
2358 88474 : oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
2359 :
2360 88474 : apply_dispatch(&s2);
2361 :
2362 88473 : MemoryContextReset(ApplyMessageContext);
2363 :
2364 88473 : MemoryContextSwitchTo(oldcxt);
2365 :
2366 88473 : nchanges++;
2367 :
2368 : /*
2369 : * It is possible the file has been closed because we have processed
2370 : * the transaction end message like stream_commit in which case that
2371 : * must be the last message.
2372 : */
2373 88473 : if (!stream_fd)
2374 : {
2375 4 : ensure_last_message(stream_fileset, xid, fileno, offset);
2376 4 : break;
2377 : }
2378 :
2379 88469 : if (nchanges % 1000 == 0)
2380 83 : elog(DEBUG1, "replayed %d changes from file \"%s\"",
2381 : nchanges, path);
2382 : }
2383 :
2384 30 : if (stream_fd)
2385 26 : stream_close_file();
2386 :
2387 30 : elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2388 : nchanges, path);
2389 :
2390 30 : return;
2391 : }
2392 :
2393 : /*
2394 : * Handle STREAM COMMIT message.
2395 : */
2396 : static void
2397 61 : apply_handle_stream_commit(StringInfo s)
2398 : {
2399 : TransactionId xid;
2400 : LogicalRepCommitData commit_data;
2401 : ParallelApplyWorkerInfo *winfo;
2402 : TransApplyAction apply_action;
2403 :
2404 : /* Save the message before it is consumed. */
2405 61 : StringInfoData original_msg = *s;
2406 :
2407 61 : if (in_streamed_transaction)
2408 0 : ereport(ERROR,
2409 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2410 : errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2411 :
2412 61 : xid = logicalrep_read_stream_commit(s, &commit_data);
2413 61 : set_apply_error_context_xact(xid, commit_data.commit_lsn);
2414 :
2415 61 : apply_action = get_transaction_apply_action(xid, &winfo);
2416 :
2417 61 : switch (apply_action)
2418 : {
2419 22 : case TRANS_LEADER_APPLY:
2420 :
2421 : /*
2422 : * The transaction has been serialized to file, so replay all the
2423 : * spooled operations.
2424 : */
2425 22 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid,
2426 : commit_data.commit_lsn);
2427 :
2428 21 : apply_handle_commit_internal(&commit_data);
2429 :
2430 : /* Unlink the files with serialized changes and subxact info. */
2431 21 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
2432 :
2433 21 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2434 21 : break;
2435 :
2436 18 : case TRANS_LEADER_SEND_TO_PARALLEL:
2437 : Assert(winfo);
2438 :
2439 18 : if (pa_send_data(winfo, s->len, s->data))
2440 : {
2441 : /* Finish processing the streaming transaction. */
2442 18 : pa_xact_finish(winfo, commit_data.end_lsn);
2443 17 : break;
2444 : }
2445 :
2446 : /*
2447 : * Switch to serialize mode when we are not able to send the
2448 : * change to parallel apply worker.
2449 : */
2450 0 : pa_switch_to_partial_serialize(winfo, true);
2451 :
2452 : pg_fallthrough;
2453 2 : case TRANS_LEADER_PARTIAL_SERIALIZE:
2454 : Assert(winfo);
2455 :
2456 2 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_COMMIT,
2457 : &original_msg);
2458 :
2459 2 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
2460 :
2461 : /* Finish processing the streaming transaction. */
2462 2 : pa_xact_finish(winfo, commit_data.end_lsn);
2463 2 : break;
2464 :
2465 19 : case TRANS_PARALLEL_APPLY:
2466 :
2467 : /*
2468 : * If the parallel apply worker is applying spooled messages then
2469 : * close the file before committing.
2470 : */
2471 19 : if (stream_fd)
2472 2 : stream_close_file();
2473 :
2474 19 : apply_handle_commit_internal(&commit_data);
2475 :
2476 19 : MyParallelShared->last_commit_end = XactLastCommitEnd;
2477 :
2478 : /*
2479 : * It is important to set the transaction state as finished before
2480 : * releasing the lock. See pa_wait_for_xact_finish.
2481 : */
2482 19 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
2483 19 : pa_unlock_transaction(xid, AccessExclusiveLock);
2484 :
2485 19 : pa_reset_subtrans();
2486 :
2487 19 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2488 19 : break;
2489 :
2490 0 : default:
2491 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2492 : break;
2493 : }
2494 :
2495 : /*
2496 : * Process any tables that are being synchronized in parallel, as well as
2497 : * any newly added tables or sequences.
2498 : */
2499 59 : ProcessSyncingRelations(commit_data.end_lsn);
2500 :
2501 59 : pgstat_report_activity(STATE_IDLE, NULL);
2502 :
2503 59 : reset_apply_error_context_info();
2504 59 : }
2505 :
2506 : /*
2507 : * Helper function for apply_handle_commit and apply_handle_stream_commit.
2508 : */
2509 : static void
2510 500 : apply_handle_commit_internal(LogicalRepCommitData *commit_data)
2511 : {
2512 500 : if (is_skipping_changes())
2513 : {
2514 2 : stop_skipping_changes();
2515 :
2516 : /*
2517 : * Start a new transaction to clear the subskiplsn, if not started
2518 : * yet.
2519 : */
2520 2 : if (!IsTransactionState())
2521 1 : StartTransactionCommand();
2522 : }
2523 :
2524 500 : if (IsTransactionState())
2525 : {
2526 : /*
2527 : * The transaction is either non-empty or skipped, so we clear the
2528 : * subskiplsn.
2529 : */
2530 500 : clear_subscription_skip_lsn(commit_data->commit_lsn);
2531 :
2532 : /*
2533 : * Update origin state so we can restart streaming from correct
2534 : * position in case of crash.
2535 : */
2536 500 : replorigin_xact_state.origin_lsn = commit_data->end_lsn;
2537 500 : replorigin_xact_state.origin_timestamp = commit_data->committime;
2538 :
2539 500 : CommitTransactionCommand();
2540 :
2541 500 : if (IsTransactionBlock())
2542 : {
2543 4 : EndTransactionBlock(false);
2544 4 : CommitTransactionCommand();
2545 : }
2546 :
2547 500 : pgstat_report_stat(false);
2548 :
2549 500 : store_flush_position(commit_data->end_lsn, XactLastCommitEnd);
2550 : }
2551 : else
2552 : {
2553 : /* Process any invalidation messages that might have accumulated. */
2554 0 : AcceptInvalidationMessages();
2555 0 : maybe_reread_subscription();
2556 : }
2557 :
2558 500 : in_remote_transaction = false;
2559 500 : }
2560 :
2561 : /*
2562 : * Handle RELATION message.
2563 : *
2564 : * Note we don't do validation against local schema here. The validation
2565 : * against local schema is postponed until first change for given relation
2566 : * comes as we only care about it when applying changes for it anyway and we
2567 : * do less locking this way.
2568 : */
2569 : static void
2570 520 : apply_handle_relation(StringInfo s)
2571 : {
2572 : LogicalRepRelation *rel;
2573 :
2574 520 : if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
2575 35 : return;
2576 :
2577 485 : rel = logicalrep_read_rel(s);
2578 485 : logicalrep_relmap_update(rel);
2579 :
2580 : /* Also reset all entries in the partition map that refer to remoterel. */
2581 485 : logicalrep_partmap_reset_relmap(rel);
2582 : }
2583 :
2584 : /*
2585 : * Handle TYPE message.
2586 : *
2587 : * This implementation pays no attention to TYPE messages; we expect the user
2588 : * to have set things up so that the incoming data is acceptable to the input
2589 : * functions for the locally subscribed tables. Hence, we just read and
2590 : * discard the message.
2591 : */
2592 : static void
2593 18 : apply_handle_type(StringInfo s)
2594 : {
2595 : LogicalRepTyp typ;
2596 :
2597 18 : if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s))
2598 0 : return;
2599 :
2600 18 : logicalrep_read_typ(s, &typ);
2601 : }
2602 :
2603 : /*
2604 : * Check that we (the subscription owner) have sufficient privileges on the
2605 : * target relation to perform the given operation.
2606 : */
2607 : static void
2608 241045 : TargetPrivilegesCheck(Relation rel, AclMode mode)
2609 : {
2610 : Oid relid;
2611 : AclResult aclresult;
2612 :
2613 241045 : relid = RelationGetRelid(rel);
2614 241045 : aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
2615 241045 : if (aclresult != ACLCHECK_OK)
2616 10 : aclcheck_error(aclresult,
2617 10 : get_relkind_objtype(rel->rd_rel->relkind),
2618 10 : get_rel_name(relid));
2619 :
2620 : /*
2621 : * We lack the infrastructure to honor RLS policies. It might be possible
2622 : * to add such infrastructure here, but tablesync workers lack it, too, so
2623 : * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2624 : * but it seems dangerous to replicate a TRUNCATE and then refuse to
2625 : * replicate subsequent INSERTs, so we forbid all commands the same.
2626 : */
2627 241035 : if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2628 4 : ereport(ERROR,
2629 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2630 : errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2631 : GetUserNameFromId(GetUserId(), true),
2632 : RelationGetRelationName(rel))));
2633 241031 : }
2634 :
2635 : /*
2636 : * Handle INSERT message.
2637 : */
2638 :
2639 : static void
2640 206568 : apply_handle_insert(StringInfo s)
2641 : {
2642 : LogicalRepRelMapEntry *rel;
2643 : LogicalRepTupleData newtup;
2644 : LogicalRepRelId relid;
2645 : UserContext ucxt;
2646 : ApplyExecutionData *edata;
2647 : EState *estate;
2648 : TupleTableSlot *remoteslot;
2649 : MemoryContext oldctx;
2650 : bool run_as_owner;
2651 :
2652 : /*
2653 : * Quick return if we are skipping data modification changes or handling
2654 : * streamed transactions.
2655 : */
2656 403135 : if (is_skipping_changes() ||
2657 196567 : handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
2658 110067 : return;
2659 :
2660 96558 : begin_replication_step();
2661 :
2662 96556 : relid = logicalrep_read_insert(s, &newtup);
2663 96556 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2664 96542 : if (!should_apply_changes_for_rel(rel))
2665 : {
2666 : /*
2667 : * The relation can't become interesting in the middle of the
2668 : * transaction so it's safe to unlock it.
2669 : */
2670 57 : logicalrep_rel_close(rel, RowExclusiveLock);
2671 57 : end_replication_step();
2672 57 : return;
2673 : }
2674 :
2675 : /*
2676 : * Make sure that any user-supplied code runs as the table owner, unless
2677 : * the user has opted out of that behavior.
2678 : */
2679 96485 : run_as_owner = MySubscription->runasowner;
2680 96485 : if (!run_as_owner)
2681 96476 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2682 :
2683 : /* Set relation for error callback */
2684 96485 : apply_error_callback_arg.rel = rel;
2685 :
2686 : /* Initialize the executor state. */
2687 96485 : edata = create_edata_for_relation(rel);
2688 96485 : estate = edata->estate;
2689 96485 : remoteslot = ExecInitExtraTupleSlot(estate,
2690 96485 : RelationGetDescr(rel->localrel),
2691 : &TTSOpsVirtual);
2692 :
2693 : /* Process and store remote tuple in the slot */
2694 96485 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2695 96485 : slot_store_data(remoteslot, rel, &newtup);
2696 96485 : slot_fill_defaults(rel, estate, remoteslot);
2697 96485 : MemoryContextSwitchTo(oldctx);
2698 :
2699 : /* For a partitioned table, insert the tuple into a partition. */
2700 96485 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2701 84 : apply_handle_tuple_routing(edata,
2702 : remoteslot, NULL, CMD_INSERT);
2703 : else
2704 : {
2705 96401 : ResultRelInfo *relinfo = edata->targetRelInfo;
2706 :
2707 96401 : ExecOpenIndices(relinfo, false);
2708 96401 : apply_handle_insert_internal(edata, relinfo, remoteslot);
2709 96384 : ExecCloseIndices(relinfo);
2710 : }
2711 :
2712 96428 : finish_edata(edata);
2713 :
2714 : /* Reset relation for error callback */
2715 96428 : apply_error_callback_arg.rel = NULL;
2716 :
2717 96428 : if (!run_as_owner)
2718 96423 : RestoreUserContext(&ucxt);
2719 :
2720 96428 : logicalrep_rel_close(rel, NoLock);
2721 :
2722 96428 : end_replication_step();
2723 : }
2724 :
2725 : /*
2726 : * Workhorse for apply_handle_insert()
2727 : * relinfo is for the relation we're actually inserting into
2728 : * (could be a child partition of edata->targetRelInfo)
2729 : */
2730 : static void
2731 96486 : apply_handle_insert_internal(ApplyExecutionData *edata,
2732 : ResultRelInfo *relinfo,
2733 : TupleTableSlot *remoteslot)
2734 : {
2735 96486 : EState *estate = edata->estate;
2736 :
2737 : /* Caller should have opened indexes already. */
2738 : Assert(relinfo->ri_IndexRelationDescs != NULL ||
2739 : !relinfo->ri_RelationDesc->rd_rel->relhasindex ||
2740 : RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
2741 :
2742 : /* Caller will not have done this bit. */
2743 : Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
2744 96486 : InitConflictIndexes(relinfo);
2745 :
2746 : /* Do the insert. */
2747 96486 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
2748 96479 : ExecSimpleRelationInsert(relinfo, estate, remoteslot);
2749 96429 : }
2750 :
2751 : /*
2752 : * Check if the logical replication relation is updatable and throw
2753 : * appropriate error if it isn't.
2754 : */
2755 : static void
2756 72306 : check_relation_updatable(LogicalRepRelMapEntry *rel)
2757 : {
2758 : /*
2759 : * For partitioned tables, we only need to care if the target partition is
2760 : * updatable (aka has PK or RI defined for it).
2761 : */
2762 72306 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2763 30 : return;
2764 :
2765 : /* Updatable, no error. */
2766 72276 : if (rel->updatable)
2767 72276 : return;
2768 :
2769 : /*
2770 : * We are in error mode so it's fine this is somewhat slow. It's better to
2771 : * give user correct error.
2772 : */
2773 0 : if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
2774 : {
2775 0 : ereport(ERROR,
2776 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2777 : errmsg("publisher did not send replica identity column "
2778 : "expected by the logical replication target relation \"%s.%s\"",
2779 : rel->remoterel.nspname, rel->remoterel.relname)));
2780 : }
2781 :
2782 0 : ereport(ERROR,
2783 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2784 : errmsg("logical replication target relation \"%s.%s\" has "
2785 : "neither REPLICA IDENTITY index nor PRIMARY "
2786 : "KEY and published relation does not have "
2787 : "REPLICA IDENTITY FULL",
2788 : rel->remoterel.nspname, rel->remoterel.relname)));
2789 : }
2790 :
2791 : /*
2792 : * Handle UPDATE message.
2793 : *
2794 : * TODO: FDW support
2795 : */
2796 : static void
2797 66173 : apply_handle_update(StringInfo s)
2798 : {
2799 : LogicalRepRelMapEntry *rel;
2800 : LogicalRepRelId relid;
2801 : UserContext ucxt;
2802 : ApplyExecutionData *edata;
2803 : EState *estate;
2804 : LogicalRepTupleData oldtup;
2805 : LogicalRepTupleData newtup;
2806 : bool has_oldtup;
2807 : TupleTableSlot *remoteslot;
2808 : RTEPermissionInfo *target_perminfo;
2809 : MemoryContext oldctx;
2810 : bool run_as_owner;
2811 :
2812 : /*
2813 : * Quick return if we are skipping data modification changes or handling
2814 : * streamed transactions.
2815 : */
2816 132343 : if (is_skipping_changes() ||
2817 66170 : handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
2818 34223 : return;
2819 :
2820 31950 : begin_replication_step();
2821 :
2822 31949 : relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
2823 : &newtup);
2824 31949 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2825 31949 : if (!should_apply_changes_for_rel(rel))
2826 : {
2827 : /*
2828 : * The relation can't become interesting in the middle of the
2829 : * transaction so it's safe to unlock it.
2830 : */
2831 0 : logicalrep_rel_close(rel, RowExclusiveLock);
2832 0 : end_replication_step();
2833 0 : return;
2834 : }
2835 :
2836 : /* Set relation for error callback */
2837 31949 : apply_error_callback_arg.rel = rel;
2838 :
2839 : /* Check if we can do the update. */
2840 31949 : check_relation_updatable(rel);
2841 :
2842 : /*
2843 : * Make sure that any user-supplied code runs as the table owner, unless
2844 : * the user has opted out of that behavior.
2845 : */
2846 31949 : run_as_owner = MySubscription->runasowner;
2847 31949 : if (!run_as_owner)
2848 31945 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2849 :
2850 : /* Initialize the executor state. */
2851 31948 : edata = create_edata_for_relation(rel);
2852 31948 : estate = edata->estate;
2853 31948 : remoteslot = ExecInitExtraTupleSlot(estate,
2854 31948 : RelationGetDescr(rel->localrel),
2855 : &TTSOpsVirtual);
2856 :
2857 : /*
2858 : * Populate updatedCols so that per-column triggers can fire, and so
2859 : * executor can correctly pass down indexUnchanged hint. This could
2860 : * include more columns than were actually changed on the publisher
2861 : * because the logical replication protocol doesn't contain that
2862 : * information. But it would for example exclude columns that only exist
2863 : * on the subscriber, since we are not touching those.
2864 : */
2865 31948 : target_perminfo = list_nth(estate->es_rteperminfos, 0);
2866 159351 : for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2867 : {
2868 127403 : CompactAttribute *att = TupleDescCompactAttr(remoteslot->tts_tupleDescriptor, i);
2869 127403 : int remoteattnum = rel->attrmap->attnums[i];
2870 :
2871 127403 : if (!att->attisdropped && remoteattnum >= 0)
2872 : {
2873 : Assert(remoteattnum < newtup.ncols);
2874 68886 : if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
2875 68886 : target_perminfo->updatedCols =
2876 68886 : bms_add_member(target_perminfo->updatedCols,
2877 : i + 1 - FirstLowInvalidHeapAttributeNumber);
2878 : }
2879 : }
2880 :
2881 : /* Build the search tuple. */
2882 31948 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2883 31948 : slot_store_data(remoteslot, rel,
2884 31948 : has_oldtup ? &oldtup : &newtup);
2885 31948 : MemoryContextSwitchTo(oldctx);
2886 :
2887 : /* For a partitioned table, apply update to correct partition. */
2888 31948 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2889 13 : apply_handle_tuple_routing(edata,
2890 : remoteslot, &newtup, CMD_UPDATE);
2891 : else
2892 31935 : apply_handle_update_internal(edata, edata->targetRelInfo,
2893 : remoteslot, &newtup, rel->localindexoid);
2894 :
2895 31939 : finish_edata(edata);
2896 :
2897 : /* Reset relation for error callback */
2898 31939 : apply_error_callback_arg.rel = NULL;
2899 :
2900 31939 : if (!run_as_owner)
2901 31937 : RestoreUserContext(&ucxt);
2902 :
2903 31939 : logicalrep_rel_close(rel, NoLock);
2904 :
2905 31939 : end_replication_step();
2906 : }
2907 :
2908 : /*
2909 : * Workhorse for apply_handle_update()
2910 : * relinfo is for the relation we're actually updating in
2911 : * (could be a child partition of edata->targetRelInfo)
2912 : */
2913 : static void
2914 31935 : apply_handle_update_internal(ApplyExecutionData *edata,
2915 : ResultRelInfo *relinfo,
2916 : TupleTableSlot *remoteslot,
2917 : LogicalRepTupleData *newtup,
2918 : Oid localindexoid)
2919 : {
2920 31935 : EState *estate = edata->estate;
2921 31935 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2922 31935 : Relation localrel = relinfo->ri_RelationDesc;
2923 : EPQState epqstate;
2924 31935 : TupleTableSlot *localslot = NULL;
2925 31935 : ConflictTupleInfo conflicttuple = {0};
2926 : bool found;
2927 : MemoryContext oldctx;
2928 :
2929 31935 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2930 31935 : ExecOpenIndices(relinfo, false);
2931 :
2932 31935 : found = FindReplTupleInLocalRel(edata, localrel,
2933 : &relmapentry->remoterel,
2934 : localindexoid,
2935 : remoteslot, &localslot);
2936 :
2937 : /*
2938 : * Tuple found.
2939 : *
2940 : * Note this will fail if there are other conflicting unique indexes.
2941 : */
2942 31928 : if (found)
2943 : {
2944 : /*
2945 : * Report the conflict if the tuple was modified by a different
2946 : * origin.
2947 : */
2948 31919 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
2949 2 : &conflicttuple.origin, &conflicttuple.ts) &&
2950 2 : conflicttuple.origin != replorigin_xact_state.origin)
2951 : {
2952 : TupleTableSlot *newslot;
2953 :
2954 : /* Store the new tuple for conflict reporting */
2955 2 : newslot = table_slot_create(localrel, &estate->es_tupleTable);
2956 2 : slot_store_data(newslot, relmapentry, newtup);
2957 :
2958 2 : conflicttuple.slot = localslot;
2959 :
2960 2 : ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
2961 : remoteslot, newslot,
2962 : list_make1(&conflicttuple));
2963 : }
2964 :
2965 : /* Process and store remote tuple in the slot */
2966 31919 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2967 31919 : slot_modify_data(remoteslot, localslot, relmapentry, newtup);
2968 31919 : MemoryContextSwitchTo(oldctx);
2969 :
2970 31919 : EvalPlanQualSetSlot(&epqstate, remoteslot);
2971 :
2972 31919 : InitConflictIndexes(relinfo);
2973 :
2974 : /* Do the actual update. */
2975 31919 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
2976 31919 : ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2977 : remoteslot);
2978 : }
2979 : else
2980 : {
2981 : ConflictType type;
2982 9 : TupleTableSlot *newslot = localslot;
2983 :
2984 : /*
2985 : * Detecting whether the tuple was recently deleted or never existed
2986 : * is crucial to avoid misleading the user during conflict handling.
2987 : */
2988 9 : if (FindDeletedTupleInLocalRel(localrel, localindexoid, remoteslot,
2989 : &conflicttuple.xmin,
2990 : &conflicttuple.origin,
2991 3 : &conflicttuple.ts) &&
2992 3 : conflicttuple.origin != replorigin_xact_state.origin)
2993 3 : type = CT_UPDATE_DELETED;
2994 : else
2995 6 : type = CT_UPDATE_MISSING;
2996 :
2997 : /* Store the new tuple for conflict reporting */
2998 9 : slot_store_data(newslot, relmapentry, newtup);
2999 :
3000 : /*
3001 : * The tuple to be updated could not be found or was deleted. Do
3002 : * nothing except for emitting a log message.
3003 : */
3004 9 : ReportApplyConflict(estate, relinfo, LOG, type, remoteslot, newslot,
3005 : list_make1(&conflicttuple));
3006 : }
3007 :
3008 : /* Cleanup. */
3009 31926 : ExecCloseIndices(relinfo);
3010 31926 : EvalPlanQualEnd(&epqstate);
3011 31926 : }
3012 :
3013 : /*
3014 : * Handle DELETE message.
3015 : *
3016 : * TODO: FDW support
3017 : */
3018 : static void
3019 81942 : apply_handle_delete(StringInfo s)
3020 : {
3021 : LogicalRepRelMapEntry *rel;
3022 : LogicalRepTupleData oldtup;
3023 : LogicalRepRelId relid;
3024 : UserContext ucxt;
3025 : ApplyExecutionData *edata;
3026 : EState *estate;
3027 : TupleTableSlot *remoteslot;
3028 : MemoryContext oldctx;
3029 : bool run_as_owner;
3030 :
3031 : /*
3032 : * Quick return if we are skipping data modification changes or handling
3033 : * streamed transactions.
3034 : */
3035 163884 : if (is_skipping_changes() ||
3036 81942 : handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
3037 41615 : return;
3038 :
3039 40327 : begin_replication_step();
3040 :
3041 40327 : relid = logicalrep_read_delete(s, &oldtup);
3042 40327 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
3043 40327 : if (!should_apply_changes_for_rel(rel))
3044 : {
3045 : /*
3046 : * The relation can't become interesting in the middle of the
3047 : * transaction so it's safe to unlock it.
3048 : */
3049 0 : logicalrep_rel_close(rel, RowExclusiveLock);
3050 0 : end_replication_step();
3051 0 : return;
3052 : }
3053 :
3054 : /* Set relation for error callback */
3055 40327 : apply_error_callback_arg.rel = rel;
3056 :
3057 : /* Check if we can do the delete. */
3058 40327 : check_relation_updatable(rel);
3059 :
3060 : /*
3061 : * Make sure that any user-supplied code runs as the table owner, unless
3062 : * the user has opted out of that behavior.
3063 : */
3064 40327 : run_as_owner = MySubscription->runasowner;
3065 40327 : if (!run_as_owner)
3066 40325 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
3067 :
3068 : /* Initialize the executor state. */
3069 40327 : edata = create_edata_for_relation(rel);
3070 40327 : estate = edata->estate;
3071 40327 : remoteslot = ExecInitExtraTupleSlot(estate,
3072 40327 : RelationGetDescr(rel->localrel),
3073 : &TTSOpsVirtual);
3074 :
3075 : /* Build the search tuple. */
3076 40327 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3077 40327 : slot_store_data(remoteslot, rel, &oldtup);
3078 40327 : MemoryContextSwitchTo(oldctx);
3079 :
3080 : /* For a partitioned table, apply delete to correct partition. */
3081 40327 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3082 17 : apply_handle_tuple_routing(edata,
3083 : remoteslot, NULL, CMD_DELETE);
3084 : else
3085 : {
3086 40310 : ResultRelInfo *relinfo = edata->targetRelInfo;
3087 :
3088 40310 : ExecOpenIndices(relinfo, false);
3089 40310 : apply_handle_delete_internal(edata, relinfo,
3090 : remoteslot, rel->localindexoid);
3091 40310 : ExecCloseIndices(relinfo);
3092 : }
3093 :
3094 40327 : finish_edata(edata);
3095 :
3096 : /* Reset relation for error callback */
3097 40327 : apply_error_callback_arg.rel = NULL;
3098 :
3099 40327 : if (!run_as_owner)
3100 40325 : RestoreUserContext(&ucxt);
3101 :
3102 40327 : logicalrep_rel_close(rel, NoLock);
3103 :
3104 40327 : end_replication_step();
3105 : }
3106 :
3107 : /*
3108 : * Workhorse for apply_handle_delete()
3109 : * relinfo is for the relation we're actually deleting from
3110 : * (could be a child partition of edata->targetRelInfo)
3111 : */
3112 : static void
3113 40327 : apply_handle_delete_internal(ApplyExecutionData *edata,
3114 : ResultRelInfo *relinfo,
3115 : TupleTableSlot *remoteslot,
3116 : Oid localindexoid)
3117 : {
3118 40327 : EState *estate = edata->estate;
3119 40327 : Relation localrel = relinfo->ri_RelationDesc;
3120 40327 : LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
3121 : EPQState epqstate;
3122 : TupleTableSlot *localslot;
3123 40327 : ConflictTupleInfo conflicttuple = {0};
3124 : bool found;
3125 :
3126 40327 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3127 :
3128 : /* Caller should have opened indexes already. */
3129 : Assert(relinfo->ri_IndexRelationDescs != NULL ||
3130 : !localrel->rd_rel->relhasindex ||
3131 : RelationGetIndexList(localrel) == NIL);
3132 :
3133 40327 : found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
3134 : remoteslot, &localslot);
3135 :
3136 : /* If found delete it. */
3137 40327 : if (found)
3138 : {
3139 : /*
3140 : * Report the conflict if the tuple was modified by a different
3141 : * origin.
3142 : */
3143 40318 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
3144 6 : &conflicttuple.origin, &conflicttuple.ts) &&
3145 6 : conflicttuple.origin != replorigin_xact_state.origin)
3146 : {
3147 5 : conflicttuple.slot = localslot;
3148 5 : ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
3149 : remoteslot, NULL,
3150 : list_make1(&conflicttuple));
3151 : }
3152 :
3153 40318 : EvalPlanQualSetSlot(&epqstate, localslot);
3154 :
3155 : /* Do the actual delete. */
3156 40318 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_DELETE);
3157 40318 : ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
3158 : }
3159 : else
3160 : {
3161 : /*
3162 : * The tuple to be deleted could not be found. Do nothing except for
3163 : * emitting a log message.
3164 : */
3165 9 : ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
3166 : remoteslot, NULL, list_make1(&conflicttuple));
3167 : }
3168 :
3169 : /* Cleanup. */
3170 40327 : EvalPlanQualEnd(&epqstate);
3171 40327 : }
3172 :
3173 : /*
3174 : * Try to find a tuple received from the publication side (in 'remoteslot') in
3175 : * the corresponding local relation using either replica identity index,
3176 : * primary key, index or if needed, sequential scan.
3177 : *
3178 : * Local tuple, if found, is returned in '*localslot'.
3179 : */
3180 : static bool
3181 72275 : FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
3182 : LogicalRepRelation *remoterel,
3183 : Oid localidxoid,
3184 : TupleTableSlot *remoteslot,
3185 : TupleTableSlot **localslot)
3186 : {
3187 72275 : EState *estate = edata->estate;
3188 : bool found;
3189 :
3190 : /*
3191 : * Regardless of the top-level operation, we're performing a read here, so
3192 : * check for SELECT privileges.
3193 : */
3194 72275 : TargetPrivilegesCheck(localrel, ACL_SELECT);
3195 :
3196 72268 : *localslot = table_slot_create(localrel, &estate->es_tupleTable);
3197 :
3198 : Assert(OidIsValid(localidxoid) ||
3199 : (remoterel->replident == REPLICA_IDENTITY_FULL));
3200 :
3201 72268 : if (OidIsValid(localidxoid))
3202 : {
3203 : #ifdef USE_ASSERT_CHECKING
3204 : Relation idxrel = index_open(localidxoid, AccessShareLock);
3205 :
3206 : /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
3207 : Assert(GetRelationIdentityOrPK(localrel) == localidxoid ||
3208 : (remoterel->replident == REPLICA_IDENTITY_FULL &&
3209 : IsIndexUsableForReplicaIdentityFull(idxrel,
3210 : edata->targetRel->attrmap)));
3211 : index_close(idxrel, AccessShareLock);
3212 : #endif
3213 :
3214 72115 : found = RelationFindReplTupleByIndex(localrel, localidxoid,
3215 : LockTupleExclusive,
3216 : remoteslot, *localslot);
3217 : }
3218 : else
3219 153 : found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
3220 : remoteslot, *localslot);
3221 :
3222 72268 : return found;
3223 : }
3224 :
3225 : /*
3226 : * Determine whether the index can reliably locate the deleted tuple in the
3227 : * local relation.
3228 : *
3229 : * An index may exclude deleted tuples if it was re-indexed or re-created during
3230 : * change application. Therefore, an index is considered usable only if the
3231 : * conflict detection slot.xmin (conflict_detection_xmin) is greater than the
3232 : * index tuple's xmin. This ensures that any tuples deleted prior to the index
3233 : * creation or re-indexing are not relevant for conflict detection in the
3234 : * current apply worker.
3235 : *
3236 : * Note that indexes may also be excluded if they were modified by other DDL
3237 : * operations, such as ALTER INDEX. However, this is acceptable, as the
3238 : * likelihood of such DDL changes coinciding with the need to scan dead
3239 : * tuples for the update_deleted is low.
3240 : */
3241 : static bool
3242 1 : IsIndexUsableForFindingDeletedTuple(Oid localindexoid,
3243 : TransactionId conflict_detection_xmin)
3244 : {
3245 : HeapTuple index_tuple;
3246 : TransactionId index_xmin;
3247 :
3248 1 : index_tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(localindexoid));
3249 :
3250 1 : if (!HeapTupleIsValid(index_tuple)) /* should not happen */
3251 0 : elog(ERROR, "cache lookup failed for index %u", localindexoid);
3252 :
3253 : /*
3254 : * No need to check for a frozen transaction ID, as
3255 : * TransactionIdPrecedes() manages it internally, treating it as falling
3256 : * behind the conflict_detection_xmin.
3257 : */
3258 1 : index_xmin = HeapTupleHeaderGetXmin(index_tuple->t_data);
3259 :
3260 1 : ReleaseSysCache(index_tuple);
3261 :
3262 1 : return TransactionIdPrecedes(index_xmin, conflict_detection_xmin);
3263 : }
3264 :
3265 : /*
3266 : * Attempts to locate a deleted tuple in the local relation that matches the
3267 : * values of the tuple received from the publication side (in 'remoteslot').
3268 : * The search is performed using either the replica identity index, primary
3269 : * key, other available index, or a sequential scan if necessary.
3270 : *
3271 : * Returns true if the deleted tuple is found. If found, the transaction ID,
3272 : * origin, and commit timestamp of the deletion are stored in '*delete_xid',
3273 : * '*delete_origin', and '*delete_time' respectively.
3274 : */
3275 : static bool
3276 11 : FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid,
3277 : TupleTableSlot *remoteslot,
3278 : TransactionId *delete_xid, ReplOriginId *delete_origin,
3279 : TimestampTz *delete_time)
3280 : {
3281 : TransactionId oldestxmin;
3282 :
3283 : /*
3284 : * Return false if either dead tuples are not retained or commit timestamp
3285 : * data is not available.
3286 : */
3287 11 : if (!MySubscription->retaindeadtuples || !track_commit_timestamp)
3288 8 : return false;
3289 :
3290 : /*
3291 : * For conflict detection, we use the leader worker's
3292 : * oldest_nonremovable_xid value instead of invoking
3293 : * GetOldestNonRemovableTransactionId() or using the conflict detection
3294 : * slot's xmin. The oldest_nonremovable_xid acts as a threshold to
3295 : * identify tuples that were recently deleted. These deleted tuples are no
3296 : * longer visible to concurrent transactions. However, if a remote update
3297 : * matches such a tuple, we log an update_deleted conflict.
3298 : *
3299 : * While GetOldestNonRemovableTransactionId() and slot.xmin may return
3300 : * transaction IDs older than oldest_nonremovable_xid, for our current
3301 : * purpose, it is acceptable to treat tuples deleted by transactions prior
3302 : * to oldest_nonremovable_xid as update_missing conflicts.
3303 : */
3304 3 : if (am_leader_apply_worker())
3305 : {
3306 3 : oldestxmin = MyLogicalRepWorker->oldest_nonremovable_xid;
3307 : }
3308 : else
3309 : {
3310 : LogicalRepWorker *leader;
3311 :
3312 : /*
3313 : * Obtain the information from the leader apply worker as only the
3314 : * leader manages oldest_nonremovable_xid (see
3315 : * maybe_advance_nonremovable_xid() for details).
3316 : */
3317 0 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
3318 0 : leader = logicalrep_worker_find(WORKERTYPE_APPLY,
3319 0 : MyLogicalRepWorker->subid, InvalidOid,
3320 : false);
3321 0 : if (!leader)
3322 : {
3323 0 : ereport(ERROR,
3324 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3325 : errmsg("could not detect conflict as the leader apply worker has exited")));
3326 : }
3327 :
3328 0 : SpinLockAcquire(&leader->relmutex);
3329 0 : oldestxmin = leader->oldest_nonremovable_xid;
3330 0 : SpinLockRelease(&leader->relmutex);
3331 0 : LWLockRelease(LogicalRepWorkerLock);
3332 : }
3333 :
3334 : /*
3335 : * Return false if the leader apply worker has stopped retaining
3336 : * information for detecting conflicts. This implies that update_deleted
3337 : * can no longer be reliably detected.
3338 : */
3339 3 : if (!TransactionIdIsValid(oldestxmin))
3340 0 : return false;
3341 :
3342 4 : if (OidIsValid(localidxoid) &&
3343 1 : IsIndexUsableForFindingDeletedTuple(localidxoid, oldestxmin))
3344 1 : return RelationFindDeletedTupleInfoByIndex(localrel, localidxoid,
3345 : remoteslot, oldestxmin,
3346 : delete_xid, delete_origin,
3347 : delete_time);
3348 : else
3349 2 : return RelationFindDeletedTupleInfoSeq(localrel, remoteslot,
3350 : oldestxmin, delete_xid,
3351 : delete_origin, delete_time);
3352 : }
3353 :
3354 : /*
3355 : * This handles insert, update, delete on a partitioned table.
3356 : */
3357 : static void
3358 114 : apply_handle_tuple_routing(ApplyExecutionData *edata,
3359 : TupleTableSlot *remoteslot,
3360 : LogicalRepTupleData *newtup,
3361 : CmdType operation)
3362 : {
3363 114 : EState *estate = edata->estate;
3364 114 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
3365 114 : ResultRelInfo *relinfo = edata->targetRelInfo;
3366 114 : Relation parentrel = relinfo->ri_RelationDesc;
3367 : ModifyTableState *mtstate;
3368 : PartitionTupleRouting *proute;
3369 : ResultRelInfo *partrelinfo;
3370 : Relation partrel;
3371 : TupleTableSlot *remoteslot_part;
3372 : TupleConversionMap *map;
3373 : MemoryContext oldctx;
3374 114 : LogicalRepRelMapEntry *part_entry = NULL;
3375 114 : AttrMap *attrmap = NULL;
3376 :
3377 : /* ModifyTableState is needed for ExecFindPartition(). */
3378 114 : edata->mtstate = mtstate = makeNode(ModifyTableState);
3379 114 : mtstate->ps.plan = NULL;
3380 114 : mtstate->ps.state = estate;
3381 114 : mtstate->operation = operation;
3382 114 : mtstate->resultRelInfo = relinfo;
3383 :
3384 : /* ... as is PartitionTupleRouting. */
3385 114 : edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
3386 :
3387 : /*
3388 : * Find the partition to which the "search tuple" belongs.
3389 : */
3390 : Assert(remoteslot != NULL);
3391 114 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3392 114 : partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
3393 : remoteslot, estate);
3394 : Assert(partrelinfo != NULL);
3395 114 : partrel = partrelinfo->ri_RelationDesc;
3396 :
3397 : /*
3398 : * Check for supported relkind. We need this since partitions might be of
3399 : * unsupported relkinds; and the set of partitions can change, so checking
3400 : * at CREATE/ALTER SUBSCRIPTION would be insufficient.
3401 : */
3402 114 : CheckSubscriptionRelkind(partrel->rd_rel->relkind,
3403 114 : relmapentry->remoterel.relkind,
3404 114 : get_namespace_name(RelationGetNamespace(partrel)),
3405 114 : RelationGetRelationName(partrel));
3406 :
3407 : /*
3408 : * To perform any of the operations below, the tuple must match the
3409 : * partition's rowtype. Convert if needed or just copy, using a dedicated
3410 : * slot to store the tuple in any case.
3411 : */
3412 114 : remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
3413 114 : if (remoteslot_part == NULL)
3414 81 : remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
3415 114 : map = ExecGetRootToChildMap(partrelinfo, estate);
3416 114 : if (map != NULL)
3417 : {
3418 33 : attrmap = map->attrMap;
3419 33 : remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
3420 : remoteslot_part);
3421 : }
3422 : else
3423 : {
3424 81 : remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
3425 81 : slot_getallattrs(remoteslot_part);
3426 : }
3427 114 : MemoryContextSwitchTo(oldctx);
3428 :
3429 : /* Check if we can do the update or delete on the leaf partition. */
3430 114 : if (operation == CMD_UPDATE || operation == CMD_DELETE)
3431 : {
3432 30 : part_entry = logicalrep_partition_open(relmapentry, partrel,
3433 : attrmap);
3434 30 : check_relation_updatable(part_entry);
3435 : }
3436 :
3437 114 : switch (operation)
3438 : {
3439 84 : case CMD_INSERT:
3440 84 : apply_handle_insert_internal(edata, partrelinfo,
3441 : remoteslot_part);
3442 44 : break;
3443 :
3444 17 : case CMD_DELETE:
3445 17 : apply_handle_delete_internal(edata, partrelinfo,
3446 : remoteslot_part,
3447 : part_entry->localindexoid);
3448 17 : break;
3449 :
3450 13 : case CMD_UPDATE:
3451 :
3452 : /*
3453 : * For UPDATE, depending on whether or not the updated tuple
3454 : * satisfies the partition's constraint, perform a simple UPDATE
3455 : * of the partition or move the updated tuple into a different
3456 : * suitable partition.
3457 : */
3458 : {
3459 : TupleTableSlot *localslot;
3460 : ResultRelInfo *partrelinfo_new;
3461 : Relation partrel_new;
3462 : bool found;
3463 : EPQState epqstate;
3464 13 : ConflictTupleInfo conflicttuple = {0};
3465 :
3466 : /* Get the matching local tuple from the partition. */
3467 13 : found = FindReplTupleInLocalRel(edata, partrel,
3468 : &part_entry->remoterel,
3469 : part_entry->localindexoid,
3470 : remoteslot_part, &localslot);
3471 13 : if (!found)
3472 : {
3473 : ConflictType type;
3474 2 : TupleTableSlot *newslot = localslot;
3475 :
3476 : /*
3477 : * Detecting whether the tuple was recently deleted or
3478 : * never existed is crucial to avoid misleading the user
3479 : * during conflict handling.
3480 : */
3481 2 : if (FindDeletedTupleInLocalRel(partrel,
3482 : part_entry->localindexoid,
3483 : remoteslot_part,
3484 : &conflicttuple.xmin,
3485 : &conflicttuple.origin,
3486 0 : &conflicttuple.ts) &&
3487 0 : conflicttuple.origin != replorigin_xact_state.origin)
3488 0 : type = CT_UPDATE_DELETED;
3489 : else
3490 2 : type = CT_UPDATE_MISSING;
3491 :
3492 : /* Store the new tuple for conflict reporting */
3493 2 : slot_store_data(newslot, part_entry, newtup);
3494 :
3495 : /*
3496 : * The tuple to be updated could not be found or was
3497 : * deleted. Do nothing except for emitting a log message.
3498 : */
3499 2 : ReportApplyConflict(estate, partrelinfo, LOG,
3500 : type, remoteslot_part, newslot,
3501 : list_make1(&conflicttuple));
3502 :
3503 2 : return;
3504 : }
3505 :
3506 : /*
3507 : * Report the conflict if the tuple was modified by a
3508 : * different origin.
3509 : */
3510 11 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
3511 : &conflicttuple.origin,
3512 1 : &conflicttuple.ts) &&
3513 1 : conflicttuple.origin != replorigin_xact_state.origin)
3514 : {
3515 : TupleTableSlot *newslot;
3516 :
3517 : /* Store the new tuple for conflict reporting */
3518 1 : newslot = table_slot_create(partrel, &estate->es_tupleTable);
3519 1 : slot_store_data(newslot, part_entry, newtup);
3520 :
3521 1 : conflicttuple.slot = localslot;
3522 :
3523 1 : ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
3524 : remoteslot_part, newslot,
3525 : list_make1(&conflicttuple));
3526 : }
3527 :
3528 : /*
3529 : * Apply the update to the local tuple, putting the result in
3530 : * remoteslot_part.
3531 : */
3532 11 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3533 11 : slot_modify_data(remoteslot_part, localslot, part_entry,
3534 : newtup);
3535 11 : MemoryContextSwitchTo(oldctx);
3536 :
3537 11 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3538 :
3539 : /*
3540 : * Does the updated tuple still satisfy the current
3541 : * partition's constraint?
3542 : */
3543 22 : if (!partrel->rd_rel->relispartition ||
3544 11 : ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
3545 : false))
3546 : {
3547 : /*
3548 : * Yes, so simply UPDATE the partition. We don't call
3549 : * apply_handle_update_internal() here, which would
3550 : * normally do the following work, to avoid repeating some
3551 : * work already done above to find the local tuple in the
3552 : * partition.
3553 : */
3554 10 : InitConflictIndexes(partrelinfo);
3555 :
3556 10 : EvalPlanQualSetSlot(&epqstate, remoteslot_part);
3557 10 : TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
3558 : ACL_UPDATE);
3559 10 : ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3560 : localslot, remoteslot_part);
3561 : }
3562 : else
3563 : {
3564 : /* Move the tuple into the new partition. */
3565 :
3566 : /*
3567 : * New partition will be found using tuple routing, which
3568 : * can only occur via the parent table. We might need to
3569 : * convert the tuple to the parent's rowtype. Note that
3570 : * this is the tuple found in the partition, not the
3571 : * original search tuple received by this function.
3572 : */
3573 1 : if (map)
3574 : {
3575 : TupleConversionMap *PartitionToRootMap =
3576 1 : convert_tuples_by_name(RelationGetDescr(partrel),
3577 : RelationGetDescr(parentrel));
3578 :
3579 : remoteslot =
3580 1 : execute_attr_map_slot(PartitionToRootMap->attrMap,
3581 : remoteslot_part, remoteslot);
3582 : }
3583 : else
3584 : {
3585 0 : remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
3586 0 : slot_getallattrs(remoteslot);
3587 : }
3588 :
3589 : /* Find the new partition. */
3590 1 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3591 1 : partrelinfo_new = ExecFindPartition(mtstate, relinfo,
3592 : proute, remoteslot,
3593 : estate);
3594 1 : MemoryContextSwitchTo(oldctx);
3595 : Assert(partrelinfo_new != partrelinfo);
3596 1 : partrel_new = partrelinfo_new->ri_RelationDesc;
3597 :
3598 : /* Check that new partition also has supported relkind. */
3599 1 : CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3600 1 : relmapentry->remoterel.relkind,
3601 1 : get_namespace_name(RelationGetNamespace(partrel_new)),
3602 1 : RelationGetRelationName(partrel_new));
3603 :
3604 : /* DELETE old tuple found in the old partition. */
3605 1 : EvalPlanQualSetSlot(&epqstate, localslot);
3606 1 : TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, ACL_DELETE);
3607 1 : ExecSimpleRelationDelete(partrelinfo, estate, &epqstate, localslot);
3608 :
3609 : /* INSERT new tuple into the new partition. */
3610 :
3611 : /*
3612 : * Convert the replacement tuple to match the destination
3613 : * partition rowtype.
3614 : */
3615 1 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3616 1 : remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3617 1 : if (remoteslot_part == NULL)
3618 1 : remoteslot_part = table_slot_create(partrel_new,
3619 : &estate->es_tupleTable);
3620 1 : map = ExecGetRootToChildMap(partrelinfo_new, estate);
3621 1 : if (map != NULL)
3622 : {
3623 0 : remoteslot_part = execute_attr_map_slot(map->attrMap,
3624 : remoteslot,
3625 : remoteslot_part);
3626 : }
3627 : else
3628 : {
3629 1 : remoteslot_part = ExecCopySlot(remoteslot_part,
3630 : remoteslot);
3631 1 : slot_getallattrs(remoteslot);
3632 : }
3633 1 : MemoryContextSwitchTo(oldctx);
3634 1 : apply_handle_insert_internal(edata, partrelinfo_new,
3635 : remoteslot_part);
3636 : }
3637 :
3638 11 : EvalPlanQualEnd(&epqstate);
3639 : }
3640 11 : break;
3641 :
3642 0 : default:
3643 0 : elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3644 : break;
3645 : }
3646 : }
3647 :
3648 : /*
3649 : * Handle TRUNCATE message.
3650 : *
3651 : * TODO: FDW support
3652 : */
3653 : static void
3654 20 : apply_handle_truncate(StringInfo s)
3655 : {
3656 20 : bool cascade = false;
3657 20 : bool restart_seqs = false;
3658 20 : List *remote_relids = NIL;
3659 20 : List *remote_rels = NIL;
3660 20 : List *rels = NIL;
3661 20 : List *part_rels = NIL;
3662 20 : List *relids = NIL;
3663 20 : List *relids_logged = NIL;
3664 : ListCell *lc;
3665 20 : LOCKMODE lockmode = AccessExclusiveLock;
3666 :
3667 : /*
3668 : * Quick return if we are skipping data modification changes or handling
3669 : * streamed transactions.
3670 : */
3671 40 : if (is_skipping_changes() ||
3672 20 : handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
3673 0 : return;
3674 :
3675 20 : begin_replication_step();
3676 :
3677 20 : remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3678 :
3679 49 : foreach(lc, remote_relids)
3680 : {
3681 29 : LogicalRepRelId relid = lfirst_oid(lc);
3682 : LogicalRepRelMapEntry *rel;
3683 :
3684 29 : rel = logicalrep_rel_open(relid, lockmode);
3685 29 : if (!should_apply_changes_for_rel(rel))
3686 : {
3687 : /*
3688 : * The relation can't become interesting in the middle of the
3689 : * transaction so it's safe to unlock it.
3690 : */
3691 0 : logicalrep_rel_close(rel, lockmode);
3692 0 : continue;
3693 : }
3694 :
3695 29 : remote_rels = lappend(remote_rels, rel);
3696 29 : TargetPrivilegesCheck(rel->localrel, ACL_TRUNCATE);
3697 29 : rels = lappend(rels, rel->localrel);
3698 29 : relids = lappend_oid(relids, rel->localreloid);
3699 29 : if (RelationIsLogicallyLogged(rel->localrel))
3700 1 : relids_logged = lappend_oid(relids_logged, rel->localreloid);
3701 :
3702 : /*
3703 : * Truncate partitions if we got a message to truncate a partitioned
3704 : * table.
3705 : */
3706 29 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3707 : {
3708 : ListCell *child;
3709 4 : List *children = find_all_inheritors(rel->localreloid,
3710 : lockmode,
3711 : NULL);
3712 :
3713 15 : foreach(child, children)
3714 : {
3715 11 : Oid childrelid = lfirst_oid(child);
3716 : Relation childrel;
3717 :
3718 11 : if (list_member_oid(relids, childrelid))
3719 4 : continue;
3720 :
3721 : /* find_all_inheritors already got lock */
3722 7 : childrel = table_open(childrelid, NoLock);
3723 :
3724 : /*
3725 : * Ignore temp tables of other backends. See similar code in
3726 : * ExecuteTruncate().
3727 : */
3728 7 : if (RELATION_IS_OTHER_TEMP(childrel))
3729 : {
3730 0 : table_close(childrel, lockmode);
3731 0 : continue;
3732 : }
3733 :
3734 7 : TargetPrivilegesCheck(childrel, ACL_TRUNCATE);
3735 7 : rels = lappend(rels, childrel);
3736 7 : part_rels = lappend(part_rels, childrel);
3737 7 : relids = lappend_oid(relids, childrelid);
3738 : /* Log this relation only if needed for logical decoding */
3739 7 : if (RelationIsLogicallyLogged(childrel))
3740 0 : relids_logged = lappend_oid(relids_logged, childrelid);
3741 : }
3742 : }
3743 : }
3744 :
3745 : /*
3746 : * Even if we used CASCADE on the upstream primary we explicitly default
3747 : * to replaying changes without further cascading. This might be later
3748 : * changeable with a user specified option.
3749 : *
3750 : * MySubscription->runasowner tells us whether we want to execute
3751 : * replication actions as the subscription owner; the last argument to
3752 : * TruncateGuts tells it whether we want to switch to the table owner.
3753 : * Those are exactly opposite conditions.
3754 : */
3755 20 : ExecuteTruncateGuts(rels,
3756 : relids,
3757 : relids_logged,
3758 : DROP_RESTRICT,
3759 : restart_seqs,
3760 20 : !MySubscription->runasowner);
3761 49 : foreach(lc, remote_rels)
3762 : {
3763 29 : LogicalRepRelMapEntry *rel = lfirst(lc);
3764 :
3765 29 : logicalrep_rel_close(rel, NoLock);
3766 : }
3767 27 : foreach(lc, part_rels)
3768 : {
3769 7 : Relation rel = lfirst(lc);
3770 :
3771 7 : table_close(rel, NoLock);
3772 : }
3773 :
3774 20 : end_replication_step();
3775 : }
3776 :
3777 :
3778 : /*
3779 : * Logical replication protocol message dispatcher.
3780 : */
3781 : void
3782 358115 : apply_dispatch(StringInfo s)
3783 : {
3784 358115 : LogicalRepMsgType action = pq_getmsgbyte(s);
3785 : LogicalRepMsgType saved_command;
3786 :
3787 : /*
3788 : * Set the current command being applied. Since this function can be
3789 : * called recursively when applying spooled changes, save the current
3790 : * command.
3791 : */
3792 358115 : saved_command = apply_error_callback_arg.command;
3793 358115 : apply_error_callback_arg.command = action;
3794 :
3795 358115 : switch (action)
3796 : {
3797 542 : case LOGICAL_REP_MSG_BEGIN:
3798 542 : apply_handle_begin(s);
3799 542 : break;
3800 :
3801 460 : case LOGICAL_REP_MSG_COMMIT:
3802 460 : apply_handle_commit(s);
3803 460 : break;
3804 :
3805 206568 : case LOGICAL_REP_MSG_INSERT:
3806 206568 : apply_handle_insert(s);
3807 206495 : break;
3808 :
3809 66173 : case LOGICAL_REP_MSG_UPDATE:
3810 66173 : apply_handle_update(s);
3811 66162 : break;
3812 :
3813 81942 : case LOGICAL_REP_MSG_DELETE:
3814 81942 : apply_handle_delete(s);
3815 81942 : break;
3816 :
3817 20 : case LOGICAL_REP_MSG_TRUNCATE:
3818 20 : apply_handle_truncate(s);
3819 20 : break;
3820 :
3821 520 : case LOGICAL_REP_MSG_RELATION:
3822 520 : apply_handle_relation(s);
3823 520 : break;
3824 :
3825 18 : case LOGICAL_REP_MSG_TYPE:
3826 18 : apply_handle_type(s);
3827 18 : break;
3828 :
3829 7 : case LOGICAL_REP_MSG_ORIGIN:
3830 7 : apply_handle_origin(s);
3831 7 : break;
3832 :
3833 0 : case LOGICAL_REP_MSG_MESSAGE:
3834 :
3835 : /*
3836 : * Logical replication does not use generic logical messages yet.
3837 : * Although, it could be used by other applications that use this
3838 : * output plugin.
3839 : */
3840 0 : break;
3841 :
3842 845 : case LOGICAL_REP_MSG_STREAM_START:
3843 845 : apply_handle_stream_start(s);
3844 845 : break;
3845 :
3846 844 : case LOGICAL_REP_MSG_STREAM_STOP:
3847 844 : apply_handle_stream_stop(s);
3848 842 : break;
3849 :
3850 38 : case LOGICAL_REP_MSG_STREAM_ABORT:
3851 38 : apply_handle_stream_abort(s);
3852 38 : break;
3853 :
3854 61 : case LOGICAL_REP_MSG_STREAM_COMMIT:
3855 61 : apply_handle_stream_commit(s);
3856 59 : break;
3857 :
3858 17 : case LOGICAL_REP_MSG_BEGIN_PREPARE:
3859 17 : apply_handle_begin_prepare(s);
3860 17 : break;
3861 :
3862 16 : case LOGICAL_REP_MSG_PREPARE:
3863 16 : apply_handle_prepare(s);
3864 15 : break;
3865 :
3866 22 : case LOGICAL_REP_MSG_COMMIT_PREPARED:
3867 22 : apply_handle_commit_prepared(s);
3868 22 : break;
3869 :
3870 5 : case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
3871 5 : apply_handle_rollback_prepared(s);
3872 5 : break;
3873 :
3874 17 : case LOGICAL_REP_MSG_STREAM_PREPARE:
3875 17 : apply_handle_stream_prepare(s);
3876 13 : break;
3877 :
3878 0 : default:
3879 0 : ereport(ERROR,
3880 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
3881 : errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3882 : }
3883 :
3884 : /* Reset the current command */
3885 358022 : apply_error_callback_arg.command = saved_command;
3886 358022 : }
3887 :
3888 : /*
3889 : * Figure out which write/flush positions to report to the walsender process.
3890 : *
3891 : * We can't simply report back the last LSN the walsender sent us because the
3892 : * local transaction might not yet be flushed to disk locally. Instead we
3893 : * build a list that associates local with remote LSNs for every commit. When
3894 : * reporting back the flush position to the sender we iterate that list and
3895 : * check which entries on it are already locally flushed. Those we can report
3896 : * as having been flushed.
3897 : *
3898 : * The have_pending_txes is true if there are outstanding transactions that
3899 : * need to be flushed.
3900 : */
3901 : static void
3902 33166 : get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
3903 : bool *have_pending_txes)
3904 : {
3905 : dlist_mutable_iter iter;
3906 33166 : XLogRecPtr local_flush = GetFlushRecPtr(NULL);
3907 :
3908 33166 : *write = InvalidXLogRecPtr;
3909 33166 : *flush = InvalidXLogRecPtr;
3910 :
3911 33703 : dlist_foreach_modify(iter, &lsn_mapping)
3912 : {
3913 4355 : FlushPosition *pos =
3914 : dlist_container(FlushPosition, node, iter.cur);
3915 :
3916 4355 : *write = pos->remote_end;
3917 :
3918 4355 : if (pos->local_end <= local_flush)
3919 : {
3920 537 : *flush = pos->remote_end;
3921 537 : dlist_delete(iter.cur);
3922 537 : pfree(pos);
3923 : }
3924 : else
3925 : {
3926 : /*
3927 : * Don't want to uselessly iterate over the rest of the list which
3928 : * could potentially be long. Instead get the last element and
3929 : * grab the write position from there.
3930 : */
3931 3818 : pos = dlist_tail_element(FlushPosition, node,
3932 : &lsn_mapping);
3933 3818 : *write = pos->remote_end;
3934 3818 : *have_pending_txes = true;
3935 3818 : return;
3936 : }
3937 : }
3938 :
3939 29348 : *have_pending_txes = !dlist_is_empty(&lsn_mapping);
3940 : }
3941 :
3942 : /*
3943 : * Store current remote/local lsn pair in the tracking list.
3944 : */
3945 : void
3946 570 : store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
3947 : {
3948 : FlushPosition *flushpos;
3949 :
3950 : /*
3951 : * Skip for parallel apply workers, because the lsn_mapping is maintained
3952 : * by the leader apply worker.
3953 : */
3954 570 : if (am_parallel_apply_worker())
3955 19 : return;
3956 :
3957 : /* Need to do this in permanent context */
3958 551 : MemoryContextSwitchTo(ApplyContext);
3959 :
3960 : /* Track commit lsn */
3961 551 : flushpos = palloc_object(FlushPosition);
3962 551 : flushpos->local_end = local_lsn;
3963 551 : flushpos->remote_end = remote_lsn;
3964 :
3965 551 : dlist_push_tail(&lsn_mapping, &flushpos->node);
3966 551 : MemoryContextSwitchTo(ApplyMessageContext);
3967 : }
3968 :
3969 :
3970 : /* Update statistics of the worker. */
3971 : static void
3972 223075 : UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
3973 : {
3974 223075 : MyLogicalRepWorker->last_lsn = last_lsn;
3975 223075 : MyLogicalRepWorker->last_send_time = send_time;
3976 223075 : MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
3977 223075 : if (reply)
3978 : {
3979 2331 : MyLogicalRepWorker->reply_lsn = last_lsn;
3980 2331 : MyLogicalRepWorker->reply_time = send_time;
3981 : }
3982 223075 : }
3983 :
3984 : /*
3985 : * Apply main loop.
3986 : */
3987 : static void
3988 472 : LogicalRepApplyLoop(XLogRecPtr last_received)
3989 : {
3990 472 : TimestampTz last_recv_timestamp = GetCurrentTimestamp();
3991 472 : bool ping_sent = false;
3992 : TimeLineID tli;
3993 : ErrorContextCallback errcallback;
3994 472 : RetainDeadTuplesData rdt_data = {0};
3995 :
3996 : /*
3997 : * Init the ApplyMessageContext which we clean up after each replication
3998 : * protocol message.
3999 : */
4000 472 : ApplyMessageContext = AllocSetContextCreate(ApplyContext,
4001 : "ApplyMessageContext",
4002 : ALLOCSET_DEFAULT_SIZES);
4003 :
4004 : /*
4005 : * This memory context is used for per-stream data when the streaming mode
4006 : * is enabled. This context is reset on each stream stop.
4007 : */
4008 472 : LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
4009 : "LogicalStreamingContext",
4010 : ALLOCSET_DEFAULT_SIZES);
4011 :
4012 : /* mark as idle, before starting to loop */
4013 472 : pgstat_report_activity(STATE_IDLE, NULL);
4014 :
4015 : /*
4016 : * Push apply error context callback. Fields will be filled while applying
4017 : * a change.
4018 : */
4019 472 : errcallback.callback = apply_error_callback;
4020 472 : errcallback.previous = error_context_stack;
4021 472 : error_context_stack = &errcallback;
4022 472 : apply_error_context_stack = error_context_stack;
4023 :
4024 : /* This outer loop iterates once per wait. */
4025 : for (;;)
4026 30091 : {
4027 30563 : pgsocket fd = PGINVALID_SOCKET;
4028 : int rc;
4029 : int len;
4030 30563 : char *buf = NULL;
4031 30563 : bool endofstream = false;
4032 : long wait_time;
4033 :
4034 30563 : CHECK_FOR_INTERRUPTS();
4035 :
4036 30563 : MemoryContextSwitchTo(ApplyMessageContext);
4037 :
4038 30563 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
4039 :
4040 30543 : if (len != 0)
4041 : {
4042 : /* Loop to process all available data (without blocking). */
4043 : for (;;)
4044 : {
4045 252310 : CHECK_FOR_INTERRUPTS();
4046 :
4047 252310 : if (len == 0)
4048 : {
4049 29223 : break;
4050 : }
4051 223087 : else if (len < 0)
4052 : {
4053 11 : ereport(LOG,
4054 : (errmsg("data stream from publisher has ended")));
4055 11 : endofstream = true;
4056 11 : break;
4057 : }
4058 : else
4059 : {
4060 : int c;
4061 : StringInfoData s;
4062 :
4063 223076 : if (ConfigReloadPending)
4064 : {
4065 0 : ConfigReloadPending = false;
4066 0 : ProcessConfigFile(PGC_SIGHUP);
4067 : }
4068 :
4069 : /* Reset timeout. */
4070 223076 : last_recv_timestamp = GetCurrentTimestamp();
4071 223076 : ping_sent = false;
4072 :
4073 223076 : rdt_data.last_recv_time = last_recv_timestamp;
4074 :
4075 : /* Ensure we are reading the data into our memory context. */
4076 223076 : MemoryContextSwitchTo(ApplyMessageContext);
4077 :
4078 223076 : initReadOnlyStringInfo(&s, buf, len);
4079 :
4080 223076 : c = pq_getmsgbyte(&s);
4081 :
4082 223076 : if (c == PqReplMsg_WALData)
4083 : {
4084 : XLogRecPtr start_lsn;
4085 : XLogRecPtr end_lsn;
4086 : TimestampTz send_time;
4087 :
4088 205755 : start_lsn = pq_getmsgint64(&s);
4089 205755 : end_lsn = pq_getmsgint64(&s);
4090 205755 : send_time = pq_getmsgint64(&s);
4091 :
4092 205755 : if (last_received < start_lsn)
4093 153819 : last_received = start_lsn;
4094 :
4095 205755 : if (last_received < end_lsn)
4096 0 : last_received = end_lsn;
4097 :
4098 205755 : UpdateWorkerStats(last_received, send_time, false);
4099 :
4100 205755 : apply_dispatch(&s);
4101 :
4102 205668 : maybe_advance_nonremovable_xid(&rdt_data, false);
4103 : }
4104 17321 : else if (c == PqReplMsg_Keepalive)
4105 : {
4106 : XLogRecPtr end_lsn;
4107 : TimestampTz timestamp;
4108 : bool reply_requested;
4109 :
4110 2332 : end_lsn = pq_getmsgint64(&s);
4111 2332 : timestamp = pq_getmsgint64(&s);
4112 2332 : reply_requested = pq_getmsgbyte(&s);
4113 :
4114 2332 : if (last_received < end_lsn)
4115 1082 : last_received = end_lsn;
4116 :
4117 2332 : send_feedback(last_received, reply_requested, false);
4118 :
4119 2331 : maybe_advance_nonremovable_xid(&rdt_data, false);
4120 :
4121 2331 : UpdateWorkerStats(last_received, timestamp, true);
4122 : }
4123 14989 : else if (c == PqReplMsg_PrimaryStatusUpdate)
4124 : {
4125 14989 : rdt_data.remote_lsn = pq_getmsgint64(&s);
4126 14989 : rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
4127 14989 : rdt_data.remote_nextxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
4128 14989 : rdt_data.reply_time = pq_getmsgint64(&s);
4129 :
4130 : /*
4131 : * This should never happen, see
4132 : * ProcessStandbyPSRequestMessage. But if it happens
4133 : * due to a bug, we don't want to proceed as it can
4134 : * incorrectly advance oldest_nonremovable_xid.
4135 : */
4136 14989 : if (!XLogRecPtrIsValid(rdt_data.remote_lsn))
4137 0 : elog(ERROR, "cannot get the latest WAL position from the publisher");
4138 :
4139 14989 : maybe_advance_nonremovable_xid(&rdt_data, true);
4140 :
4141 14989 : UpdateWorkerStats(last_received, rdt_data.reply_time, false);
4142 : }
4143 : /* other message types are purposefully ignored */
4144 :
4145 222988 : MemoryContextReset(ApplyMessageContext);
4146 : }
4147 :
4148 222988 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
4149 : }
4150 : }
4151 :
4152 : /* confirm all writes so far */
4153 30454 : send_feedback(last_received, false, false);
4154 :
4155 : /* Reset the timestamp if no message was received */
4156 30454 : rdt_data.last_recv_time = 0;
4157 :
4158 30454 : maybe_advance_nonremovable_xid(&rdt_data, false);
4159 :
4160 30453 : if (!in_remote_transaction && !in_streamed_transaction)
4161 : {
4162 : /*
4163 : * If we didn't get any transactions for a while there might be
4164 : * unconsumed invalidation messages in the queue, consume them
4165 : * now.
4166 : */
4167 9247 : AcceptInvalidationMessages();
4168 9247 : maybe_reread_subscription();
4169 :
4170 : /*
4171 : * Process any relations that are being synchronized in parallel
4172 : * and any newly added tables or sequences.
4173 : */
4174 9203 : ProcessSyncingRelations(last_received);
4175 : }
4176 :
4177 : /* Cleanup the memory. */
4178 30203 : MemoryContextReset(ApplyMessageContext);
4179 30203 : MemoryContextSwitchTo(TopMemoryContext);
4180 :
4181 : /* Check if we need to exit the streaming loop. */
4182 30203 : if (endofstream)
4183 11 : break;
4184 :
4185 : /*
4186 : * Wait for more data or latch. If we have unflushed transactions,
4187 : * wake up after WalWriterDelay to see if they've been flushed yet (in
4188 : * which case we should send a feedback message). Otherwise, there's
4189 : * no particular urgency about waking up unless we get data or a
4190 : * signal.
4191 : */
4192 30192 : if (!dlist_is_empty(&lsn_mapping))
4193 2905 : wait_time = WalWriterDelay;
4194 : else
4195 27287 : wait_time = NAPTIME_PER_CYCLE;
4196 :
4197 : /*
4198 : * Ensure to wake up when it's possible to advance the non-removable
4199 : * transaction ID, or when the retention duration may have exceeded
4200 : * max_retention_duration.
4201 : */
4202 30192 : if (MySubscription->retentionactive)
4203 : {
4204 5987 : if (rdt_data.phase == RDT_GET_CANDIDATE_XID &&
4205 86 : rdt_data.xid_advance_interval)
4206 0 : wait_time = Min(wait_time, rdt_data.xid_advance_interval);
4207 5987 : else if (MySubscription->maxretention > 0)
4208 1 : wait_time = Min(wait_time, MySubscription->maxretention);
4209 : }
4210 :
4211 30192 : rc = WaitLatchOrSocket(MyLatch,
4212 : WL_SOCKET_READABLE | WL_LATCH_SET |
4213 : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
4214 : fd, wait_time,
4215 : WAIT_EVENT_LOGICAL_APPLY_MAIN);
4216 :
4217 30192 : if (rc & WL_LATCH_SET)
4218 : {
4219 767 : ResetLatch(MyLatch);
4220 767 : CHECK_FOR_INTERRUPTS();
4221 : }
4222 :
4223 30091 : if (ConfigReloadPending)
4224 : {
4225 10 : ConfigReloadPending = false;
4226 10 : ProcessConfigFile(PGC_SIGHUP);
4227 : }
4228 :
4229 30091 : if (rc & WL_TIMEOUT)
4230 : {
4231 : /*
4232 : * We didn't receive anything new. If we haven't heard anything
4233 : * from the server for more than wal_receiver_timeout / 2, ping
4234 : * the server. Also, if it's been longer than
4235 : * wal_receiver_status_interval since the last update we sent,
4236 : * send a status update to the primary anyway, to report any
4237 : * progress in applying WAL.
4238 : */
4239 366 : bool requestReply = false;
4240 :
4241 : /*
4242 : * Check if time since last receive from primary has reached the
4243 : * configured limit.
4244 : */
4245 366 : if (wal_receiver_timeout > 0)
4246 : {
4247 366 : TimestampTz now = GetCurrentTimestamp();
4248 : TimestampTz timeout;
4249 :
4250 366 : timeout =
4251 366 : TimestampTzPlusMilliseconds(last_recv_timestamp,
4252 : wal_receiver_timeout);
4253 :
4254 366 : if (now >= timeout)
4255 0 : ereport(ERROR,
4256 : (errcode(ERRCODE_CONNECTION_FAILURE),
4257 : errmsg("terminating logical replication worker due to timeout")));
4258 :
4259 : /* Check to see if it's time for a ping. */
4260 366 : if (!ping_sent)
4261 : {
4262 366 : timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
4263 : (wal_receiver_timeout / 2));
4264 366 : if (now >= timeout)
4265 : {
4266 0 : requestReply = true;
4267 0 : ping_sent = true;
4268 : }
4269 : }
4270 : }
4271 :
4272 366 : send_feedback(last_received, requestReply, requestReply);
4273 :
4274 366 : maybe_advance_nonremovable_xid(&rdt_data, false);
4275 :
4276 : /*
4277 : * Force reporting to ensure long idle periods don't lead to
4278 : * arbitrarily delayed stats. Stats can only be reported outside
4279 : * of (implicit or explicit) transactions. That shouldn't lead to
4280 : * stats being delayed for long, because transactions are either
4281 : * sent as a whole on commit or streamed. Streamed transactions
4282 : * are spilled to disk and applied on commit.
4283 : */
4284 366 : if (!IsTransactionState())
4285 366 : pgstat_report_stat(true);
4286 : }
4287 : }
4288 :
4289 : /* Pop the error context stack */
4290 11 : error_context_stack = errcallback.previous;
4291 11 : apply_error_context_stack = error_context_stack;
4292 :
4293 : /* All done */
4294 11 : walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
4295 0 : }
4296 :
4297 : /*
4298 : * Send a Standby Status Update message to server.
4299 : *
4300 : * 'recvpos' is the latest LSN we've received data to, force is set if we need
4301 : * to send a response to avoid timeouts.
4302 : */
4303 : static void
4304 33152 : send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
4305 : {
4306 : static StringInfo reply_message = NULL;
4307 : static TimestampTz send_time = 0;
4308 :
4309 : static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
4310 : static XLogRecPtr last_writepos = InvalidXLogRecPtr;
4311 :
4312 : XLogRecPtr writepos;
4313 : XLogRecPtr flushpos;
4314 : TimestampTz now;
4315 : bool have_pending_txes;
4316 :
4317 : /*
4318 : * If the user doesn't want status to be reported to the publisher, be
4319 : * sure to exit before doing anything at all.
4320 : */
4321 33152 : if (!force && wal_receiver_status_interval <= 0)
4322 15447 : return;
4323 :
4324 : /* It's legal to not pass a recvpos */
4325 33152 : if (recvpos < last_recvpos)
4326 0 : recvpos = last_recvpos;
4327 :
4328 33152 : get_flush_position(&writepos, &flushpos, &have_pending_txes);
4329 :
4330 : /*
4331 : * No outstanding transactions to flush, we can report the latest received
4332 : * position. This is important for synchronous replication.
4333 : */
4334 33152 : if (!have_pending_txes)
4335 29340 : flushpos = writepos = recvpos;
4336 :
4337 33152 : if (writepos < last_writepos)
4338 0 : writepos = last_writepos;
4339 :
4340 33152 : if (flushpos < last_flushpos)
4341 3761 : flushpos = last_flushpos;
4342 :
4343 33152 : now = GetCurrentTimestamp();
4344 :
4345 : /* if we've already reported everything we're good */
4346 33152 : if (!force &&
4347 32727 : writepos == last_writepos &&
4348 15755 : flushpos == last_flushpos &&
4349 15531 : !TimestampDifferenceExceeds(send_time, now,
4350 : wal_receiver_status_interval * 1000))
4351 15447 : return;
4352 17705 : send_time = now;
4353 :
4354 17705 : if (!reply_message)
4355 : {
4356 472 : MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
4357 :
4358 472 : reply_message = makeStringInfo();
4359 472 : MemoryContextSwitchTo(oldctx);
4360 : }
4361 : else
4362 17233 : resetStringInfo(reply_message);
4363 :
4364 17705 : pq_sendbyte(reply_message, PqReplMsg_StandbyStatusUpdate);
4365 17705 : pq_sendint64(reply_message, recvpos); /* write */
4366 17705 : pq_sendint64(reply_message, flushpos); /* flush */
4367 17705 : pq_sendint64(reply_message, writepos); /* apply */
4368 17705 : pq_sendint64(reply_message, now); /* sendTime */
4369 17705 : pq_sendbyte(reply_message, requestReply); /* replyRequested */
4370 :
4371 17705 : elog(DEBUG2, "sending feedback (force %d) to recv %X/%08X, write %X/%08X, flush %X/%08X",
4372 : force,
4373 : LSN_FORMAT_ARGS(recvpos),
4374 : LSN_FORMAT_ARGS(writepos),
4375 : LSN_FORMAT_ARGS(flushpos));
4376 :
4377 17705 : walrcv_send(LogRepWorkerWalRcvConn,
4378 : reply_message->data, reply_message->len);
4379 :
4380 17704 : if (recvpos > last_recvpos)
4381 16972 : last_recvpos = recvpos;
4382 17704 : if (writepos > last_writepos)
4383 16976 : last_writepos = writepos;
4384 17704 : if (flushpos > last_flushpos)
4385 16802 : last_flushpos = flushpos;
4386 : }
4387 :
4388 : /*
4389 : * Attempt to advance the non-removable transaction ID.
4390 : *
4391 : * See comments atop worker.c for details.
4392 : */
4393 : static void
4394 253808 : maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
4395 : bool status_received)
4396 : {
4397 253808 : if (!can_advance_nonremovable_xid(rdt_data))
4398 232614 : return;
4399 :
4400 21194 : process_rdt_phase_transition(rdt_data, status_received);
4401 : }
4402 :
4403 : /*
4404 : * Preliminary check to determine if advancing the non-removable transaction ID
4405 : * is allowed.
4406 : */
4407 : static bool
4408 253808 : can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
4409 : {
4410 : /*
4411 : * It is sufficient to manage non-removable transaction ID for a
4412 : * subscription by the main apply worker to detect update_deleted reliably
4413 : * even for table sync or parallel apply workers.
4414 : */
4415 253808 : if (!am_leader_apply_worker())
4416 376 : return false;
4417 :
4418 : /* No need to advance if retaining dead tuples is not required */
4419 253432 : if (!MySubscription->retaindeadtuples)
4420 232238 : return false;
4421 :
4422 21194 : return true;
4423 : }
4424 :
4425 : /*
4426 : * Process phase transitions during the non-removable transaction ID
4427 : * advancement. See comments atop worker.c for details of the transition.
4428 : */
4429 : static void
4430 36287 : process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
4431 : bool status_received)
4432 : {
4433 36287 : switch (rdt_data->phase)
4434 : {
4435 233 : case RDT_GET_CANDIDATE_XID:
4436 233 : get_candidate_xid(rdt_data);
4437 233 : break;
4438 14995 : case RDT_REQUEST_PUBLISHER_STATUS:
4439 14995 : request_publisher_status(rdt_data);
4440 14995 : break;
4441 20940 : case RDT_WAIT_FOR_PUBLISHER_STATUS:
4442 20940 : wait_for_publisher_status(rdt_data, status_received);
4443 20940 : break;
4444 117 : case RDT_WAIT_FOR_LOCAL_FLUSH:
4445 117 : wait_for_local_flush(rdt_data);
4446 117 : break;
4447 1 : case RDT_STOP_CONFLICT_INFO_RETENTION:
4448 1 : stop_conflict_info_retention(rdt_data);
4449 1 : break;
4450 1 : case RDT_RESUME_CONFLICT_INFO_RETENTION:
4451 1 : resume_conflict_info_retention(rdt_data);
4452 0 : break;
4453 : }
4454 36286 : }
4455 :
4456 : /*
4457 : * Workhorse for the RDT_GET_CANDIDATE_XID phase.
4458 : */
4459 : static void
4460 233 : get_candidate_xid(RetainDeadTuplesData *rdt_data)
4461 : {
4462 : TransactionId oldest_running_xid;
4463 : TimestampTz now;
4464 :
4465 : /*
4466 : * Use last_recv_time when applying changes in the loop to avoid
4467 : * unnecessary system time retrieval. If last_recv_time is not available,
4468 : * obtain the current timestamp.
4469 : */
4470 233 : now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4471 :
4472 : /*
4473 : * Compute the candidate_xid and request the publisher status at most once
4474 : * per xid_advance_interval. Refer to adjust_xid_advance_interval() for
4475 : * details on how this value is dynamically adjusted. This is to avoid
4476 : * using CPU and network resources without making much progress.
4477 : */
4478 233 : if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
4479 : rdt_data->xid_advance_interval))
4480 0 : return;
4481 :
4482 : /*
4483 : * Immediately update the timer, even if the function returns later
4484 : * without setting candidate_xid due to inactivity on the subscriber. This
4485 : * avoids frequent calls to GetOldestActiveTransactionId.
4486 : */
4487 233 : rdt_data->candidate_xid_time = now;
4488 :
4489 : /*
4490 : * Consider transactions in the current database, as only dead tuples from
4491 : * this database are required for conflict detection.
4492 : */
4493 233 : oldest_running_xid = GetOldestActiveTransactionId(false, false);
4494 :
4495 : /*
4496 : * Oldest active transaction ID (oldest_running_xid) can't be behind any
4497 : * of its previously computed value.
4498 : */
4499 : Assert(TransactionIdPrecedesOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
4500 : oldest_running_xid));
4501 :
4502 : /* Return if the oldest_nonremovable_xid cannot be advanced */
4503 233 : if (TransactionIdEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
4504 : oldest_running_xid))
4505 : {
4506 177 : adjust_xid_advance_interval(rdt_data, false);
4507 177 : return;
4508 : }
4509 :
4510 56 : adjust_xid_advance_interval(rdt_data, true);
4511 :
4512 56 : rdt_data->candidate_xid = oldest_running_xid;
4513 56 : rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
4514 :
4515 : /* process the next phase */
4516 56 : process_rdt_phase_transition(rdt_data, false);
4517 : }
4518 :
4519 : /*
4520 : * Workhorse for the RDT_REQUEST_PUBLISHER_STATUS phase.
4521 : */
4522 : static void
4523 14995 : request_publisher_status(RetainDeadTuplesData *rdt_data)
4524 : {
4525 : static StringInfo request_message = NULL;
4526 :
4527 14995 : if (!request_message)
4528 : {
4529 12 : MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
4530 :
4531 12 : request_message = makeStringInfo();
4532 12 : MemoryContextSwitchTo(oldctx);
4533 : }
4534 : else
4535 14983 : resetStringInfo(request_message);
4536 :
4537 : /*
4538 : * Send the current time to update the remote walsender's latest reply
4539 : * message received time.
4540 : */
4541 14995 : pq_sendbyte(request_message, PqReplMsg_PrimaryStatusRequest);
4542 14995 : pq_sendint64(request_message, GetCurrentTimestamp());
4543 :
4544 14995 : elog(DEBUG2, "sending publisher status request message");
4545 :
4546 : /* Send a request for the publisher status */
4547 14995 : walrcv_send(LogRepWorkerWalRcvConn,
4548 : request_message->data, request_message->len);
4549 :
4550 14995 : rdt_data->phase = RDT_WAIT_FOR_PUBLISHER_STATUS;
4551 :
4552 : /*
4553 : * Skip calling maybe_advance_nonremovable_xid() since further transition
4554 : * is possible only once we receive the publisher status message.
4555 : */
4556 14995 : }
4557 :
4558 : /*
4559 : * Workhorse for the RDT_WAIT_FOR_PUBLISHER_STATUS phase.
4560 : */
4561 : static void
4562 20940 : wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
4563 : bool status_received)
4564 : {
4565 : /*
4566 : * Return if we have requested but not yet received the publisher status.
4567 : */
4568 20940 : if (!status_received)
4569 5951 : return;
4570 :
4571 : /*
4572 : * We don't need to maintain oldest_nonremovable_xid if we decide to stop
4573 : * retaining conflict information for this worker.
4574 : */
4575 14989 : if (should_stop_conflict_info_retention(rdt_data))
4576 : {
4577 0 : rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
4578 0 : return;
4579 : }
4580 :
4581 14989 : if (!FullTransactionIdIsValid(rdt_data->remote_wait_for))
4582 50 : rdt_data->remote_wait_for = rdt_data->remote_nextxid;
4583 :
4584 : /*
4585 : * Check if all remote concurrent transactions that were active at the
4586 : * first status request have now completed. If completed, proceed to the
4587 : * next phase; otherwise, continue checking the publisher status until
4588 : * these transactions finish.
4589 : *
4590 : * It's possible that transactions in the commit phase during the last
4591 : * cycle have now finished committing, but remote_oldestxid remains older
4592 : * than remote_wait_for. This can happen if some old transaction came in
4593 : * the commit phase when we requested status in this cycle. We do not
4594 : * handle this case explicitly as it's rare and the benefit doesn't
4595 : * justify the required complexity. Tracking would require either caching
4596 : * all xids at the publisher or sending them to subscribers. The condition
4597 : * will resolve naturally once the remaining transactions are finished.
4598 : *
4599 : * Directly advancing the non-removable transaction ID is possible if
4600 : * there are no activities on the publisher since the last advancement
4601 : * cycle. However, it requires maintaining two fields, last_remote_nextxid
4602 : * and last_remote_lsn, within the structure for comparison with the
4603 : * current cycle's values. Considering the minimal cost of continuing in
4604 : * RDT_WAIT_FOR_LOCAL_FLUSH without awaiting changes, we opted not to
4605 : * advance the transaction ID here.
4606 : */
4607 14989 : if (FullTransactionIdPrecedesOrEquals(rdt_data->remote_wait_for,
4608 : rdt_data->remote_oldestxid))
4609 50 : rdt_data->phase = RDT_WAIT_FOR_LOCAL_FLUSH;
4610 : else
4611 14939 : rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
4612 :
4613 : /* process the next phase */
4614 14989 : process_rdt_phase_transition(rdt_data, false);
4615 : }
4616 :
4617 : /*
4618 : * Workhorse for the RDT_WAIT_FOR_LOCAL_FLUSH phase.
4619 : */
4620 : static void
4621 117 : wait_for_local_flush(RetainDeadTuplesData *rdt_data)
4622 : {
4623 : Assert(XLogRecPtrIsValid(rdt_data->remote_lsn) &&
4624 : TransactionIdIsValid(rdt_data->candidate_xid));
4625 :
4626 : /*
4627 : * We expect the publisher and subscriber clocks to be in sync using time
4628 : * sync service like NTP. Otherwise, we will advance this worker's
4629 : * oldest_nonremovable_xid prematurely, leading to the removal of rows
4630 : * required to detect update_deleted reliably. This check primarily
4631 : * addresses scenarios where the publisher's clock falls behind; if the
4632 : * publisher's clock is ahead, subsequent transactions will naturally bear
4633 : * later commit timestamps, conforming to the design outlined atop
4634 : * worker.c.
4635 : *
4636 : * XXX Consider waiting for the publisher's clock to catch up with the
4637 : * subscriber's before proceeding to the next phase.
4638 : */
4639 117 : if (TimestampDifferenceExceeds(rdt_data->reply_time,
4640 : rdt_data->candidate_xid_time, 0))
4641 0 : ereport(ERROR,
4642 : errmsg_internal("oldest_nonremovable_xid transaction ID could be advanced prematurely"),
4643 : errdetail_internal("The clock on the publisher is behind that of the subscriber."));
4644 :
4645 : /*
4646 : * Do not attempt to advance the non-removable transaction ID when table
4647 : * sync is in progress. During this time, changes from a single
4648 : * transaction may be applied by multiple table sync workers corresponding
4649 : * to the target tables. So, it's necessary for all table sync workers to
4650 : * apply and flush the corresponding changes before advancing the
4651 : * transaction ID, otherwise, dead tuples that are still needed for
4652 : * conflict detection in table sync workers could be removed prematurely.
4653 : * However, confirming the apply and flush progress across all table sync
4654 : * workers is complex and not worth the effort, so we simply return if not
4655 : * all tables are in the READY state.
4656 : *
4657 : * Advancing the transaction ID is necessary even when no tables are
4658 : * currently subscribed, to avoid retaining dead tuples unnecessarily.
4659 : * While it might seem safe to skip all phases and directly assign
4660 : * candidate_xid to oldest_nonremovable_xid during the
4661 : * RDT_GET_CANDIDATE_XID phase in such cases, this is unsafe. If users
4662 : * concurrently add tables to the subscription, the apply worker may not
4663 : * process invalidations in time. Consequently,
4664 : * HasSubscriptionTablesCached() might miss the new tables, leading to
4665 : * premature advancement of oldest_nonremovable_xid.
4666 : *
4667 : * Performing the check during RDT_WAIT_FOR_LOCAL_FLUSH is safe, as
4668 : * invalidations are guaranteed to be processed before applying changes
4669 : * from newly added tables while waiting for the local flush to reach
4670 : * remote_lsn.
4671 : *
4672 : * Additionally, even if we check for subscription tables during
4673 : * RDT_GET_CANDIDATE_XID, they might be dropped before reaching
4674 : * RDT_WAIT_FOR_LOCAL_FLUSH. Therefore, it's still necessary to verify
4675 : * subscription tables at this stage to prevent unnecessary tuple
4676 : * retention.
4677 : */
4678 117 : if (HasSubscriptionTablesCached() && !AllTablesyncsReady())
4679 : {
4680 : TimestampTz now;
4681 :
4682 28 : now = rdt_data->last_recv_time
4683 14 : ? rdt_data->last_recv_time : GetCurrentTimestamp();
4684 :
4685 : /*
4686 : * Record the time spent waiting for table sync, it is needed for the
4687 : * timeout check in should_stop_conflict_info_retention().
4688 : */
4689 14 : rdt_data->table_sync_wait_time =
4690 14 : TimestampDifferenceMilliseconds(rdt_data->candidate_xid_time, now);
4691 :
4692 14 : return;
4693 : }
4694 :
4695 : /*
4696 : * We don't need to maintain oldest_nonremovable_xid if we decide to stop
4697 : * retaining conflict information for this worker.
4698 : */
4699 103 : if (should_stop_conflict_info_retention(rdt_data))
4700 : {
4701 1 : rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
4702 1 : return;
4703 : }
4704 :
4705 : /*
4706 : * Update and check the remote flush position if we are applying changes
4707 : * in a loop. This is done at most once per WalWriterDelay to avoid
4708 : * performing costly operations in get_flush_position() too frequently
4709 : * during change application.
4710 : */
4711 134 : if (last_flushpos < rdt_data->remote_lsn && rdt_data->last_recv_time &&
4712 32 : TimestampDifferenceExceeds(rdt_data->flushpos_update_time,
4713 : rdt_data->last_recv_time, WalWriterDelay))
4714 : {
4715 : XLogRecPtr writepos;
4716 : XLogRecPtr flushpos;
4717 : bool have_pending_txes;
4718 :
4719 : /* Fetch the latest remote flush position */
4720 14 : get_flush_position(&writepos, &flushpos, &have_pending_txes);
4721 :
4722 14 : if (flushpos > last_flushpos)
4723 0 : last_flushpos = flushpos;
4724 :
4725 14 : rdt_data->flushpos_update_time = rdt_data->last_recv_time;
4726 : }
4727 :
4728 : /* Return to wait for the changes to be applied */
4729 102 : if (last_flushpos < rdt_data->remote_lsn)
4730 53 : return;
4731 :
4732 : /*
4733 : * Reaching this point implies should_stop_conflict_info_retention()
4734 : * returned false earlier, meaning that the most recent duration for
4735 : * advancing the non-removable transaction ID is within the
4736 : * max_retention_duration or max_retention_duration is set to 0.
4737 : *
4738 : * Therefore, if conflict info retention was previously stopped due to a
4739 : * timeout, it is now safe to resume retention.
4740 : */
4741 49 : if (!MySubscription->retentionactive)
4742 : {
4743 1 : rdt_data->phase = RDT_RESUME_CONFLICT_INFO_RETENTION;
4744 1 : return;
4745 : }
4746 :
4747 : /*
4748 : * Reaching here means the remote WAL position has been received, and all
4749 : * transactions up to that position on the publisher have been applied and
4750 : * flushed locally. So, we can advance the non-removable transaction ID.
4751 : */
4752 48 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
4753 48 : MyLogicalRepWorker->oldest_nonremovable_xid = rdt_data->candidate_xid;
4754 48 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
4755 :
4756 48 : elog(DEBUG2, "confirmed flush up to remote lsn %X/%08X: new oldest_nonremovable_xid %u",
4757 : LSN_FORMAT_ARGS(rdt_data->remote_lsn),
4758 : rdt_data->candidate_xid);
4759 :
4760 : /* Notify launcher to update the xmin of the conflict slot */
4761 48 : ApplyLauncherWakeup();
4762 :
4763 48 : reset_retention_data_fields(rdt_data);
4764 :
4765 : /* process the next phase */
4766 48 : process_rdt_phase_transition(rdt_data, false);
4767 : }
4768 :
4769 : /*
4770 : * Check whether conflict information retention should be stopped due to
4771 : * exceeding the maximum wait time (max_retention_duration).
4772 : *
4773 : * If retention should be stopped, return true. Otherwise, return false.
4774 : */
4775 : static bool
4776 15092 : should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
4777 : {
4778 : TimestampTz now;
4779 :
4780 : Assert(TransactionIdIsValid(rdt_data->candidate_xid));
4781 : Assert(rdt_data->phase == RDT_WAIT_FOR_PUBLISHER_STATUS ||
4782 : rdt_data->phase == RDT_WAIT_FOR_LOCAL_FLUSH);
4783 :
4784 15092 : if (!MySubscription->maxretention)
4785 15091 : return false;
4786 :
4787 : /*
4788 : * Use last_recv_time when applying changes in the loop to avoid
4789 : * unnecessary system time retrieval. If last_recv_time is not available,
4790 : * obtain the current timestamp.
4791 : */
4792 1 : now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4793 :
4794 : /*
4795 : * Return early if the wait time has not exceeded the configured maximum
4796 : * (max_retention_duration). Time spent waiting for table synchronization
4797 : * is excluded from this calculation, as it occurs infrequently.
4798 : */
4799 1 : if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
4800 1 : MySubscription->maxretention +
4801 1 : rdt_data->table_sync_wait_time))
4802 0 : return false;
4803 :
4804 1 : return true;
4805 : }
4806 :
4807 : /*
4808 : * Workhorse for the RDT_STOP_CONFLICT_INFO_RETENTION phase.
4809 : */
4810 : static void
4811 1 : stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
4812 : {
4813 : /* Stop retention if not yet */
4814 1 : if (MySubscription->retentionactive)
4815 : {
4816 : /*
4817 : * If the retention status cannot be updated (e.g., due to active
4818 : * transaction), skip further processing to avoid inconsistent
4819 : * retention behavior.
4820 : */
4821 1 : if (!update_retention_status(false))
4822 0 : return;
4823 :
4824 1 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
4825 1 : MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId;
4826 1 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
4827 :
4828 1 : ereport(LOG,
4829 : errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts",
4830 : MySubscription->name),
4831 : errdetail("Retention is stopped because the apply process has not caught up with the publisher within the configured max_retention_duration."));
4832 : }
4833 :
4834 : Assert(!TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid));
4835 :
4836 : /*
4837 : * If retention has been stopped, reset to the initial phase to retry
4838 : * resuming retention. This reset is required to recalculate the current
4839 : * wait time and resume retention if the time falls within
4840 : * max_retention_duration.
4841 : */
4842 1 : reset_retention_data_fields(rdt_data);
4843 : }
4844 :
4845 : /*
4846 : * Workhorse for the RDT_RESUME_CONFLICT_INFO_RETENTION phase.
4847 : */
4848 : static void
4849 1 : resume_conflict_info_retention(RetainDeadTuplesData *rdt_data)
4850 : {
4851 : /* We can't resume retention without updating retention status. */
4852 1 : if (!update_retention_status(true))
4853 0 : return;
4854 :
4855 1 : ereport(LOG,
4856 : errmsg("logical replication worker for subscription \"%s\" will resume retaining the information for detecting conflicts",
4857 : MySubscription->name),
4858 : MySubscription->maxretention
4859 : ? errdetail("Retention is re-enabled because the apply process has caught up with the publisher within the configured max_retention_duration.")
4860 : : errdetail("Retention is re-enabled because max_retention_duration has been set to unlimited."));
4861 :
4862 : /*
4863 : * Restart the worker to let the launcher initialize
4864 : * oldest_nonremovable_xid at startup.
4865 : *
4866 : * While it's technically possible to derive this value on-the-fly using
4867 : * the conflict detection slot's xmin, doing so risks a race condition:
4868 : * the launcher might clean slot.xmin just after retention resumes. This
4869 : * would make oldest_nonremovable_xid unreliable, especially during xid
4870 : * wraparound.
4871 : *
4872 : * Although this can be prevented by introducing heavy weight locking, the
4873 : * complexity it will bring doesn't seem worthwhile given how rarely
4874 : * retention is resumed.
4875 : */
4876 1 : apply_worker_exit();
4877 : }
4878 :
4879 : /*
4880 : * Updates pg_subscription.subretentionactive to the given value within a
4881 : * new transaction.
4882 : *
4883 : * If already inside an active transaction, skips the update and returns
4884 : * false.
4885 : *
4886 : * Returns true if the update is successfully performed.
4887 : */
4888 : static bool
4889 2 : update_retention_status(bool active)
4890 : {
4891 : /*
4892 : * Do not update the catalog during an active transaction. The transaction
4893 : * may be started during change application, leading to a possible
4894 : * rollback of catalog updates if the application fails subsequently.
4895 : */
4896 2 : if (IsTransactionState())
4897 0 : return false;
4898 :
4899 2 : StartTransactionCommand();
4900 :
4901 : /*
4902 : * Updating pg_subscription might involve TOAST table access, so ensure we
4903 : * have a valid snapshot.
4904 : */
4905 2 : PushActiveSnapshot(GetTransactionSnapshot());
4906 :
4907 : /* Update pg_subscription.subretentionactive */
4908 2 : UpdateDeadTupleRetentionStatus(MySubscription->oid, active);
4909 :
4910 2 : PopActiveSnapshot();
4911 2 : CommitTransactionCommand();
4912 :
4913 : /* Notify launcher to update the conflict slot */
4914 2 : ApplyLauncherWakeup();
4915 :
4916 2 : MySubscription->retentionactive = active;
4917 :
4918 2 : return true;
4919 : }
4920 :
4921 : /*
4922 : * Reset all data fields of RetainDeadTuplesData except those used to
4923 : * determine the timing for the next round of transaction ID advancement. We
4924 : * can even use flushpos_update_time in the next round to decide whether to get
4925 : * the latest flush position.
4926 : */
4927 : static void
4928 49 : reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
4929 : {
4930 49 : rdt_data->phase = RDT_GET_CANDIDATE_XID;
4931 49 : rdt_data->remote_lsn = InvalidXLogRecPtr;
4932 49 : rdt_data->remote_oldestxid = InvalidFullTransactionId;
4933 49 : rdt_data->remote_nextxid = InvalidFullTransactionId;
4934 49 : rdt_data->reply_time = 0;
4935 49 : rdt_data->remote_wait_for = InvalidFullTransactionId;
4936 49 : rdt_data->candidate_xid = InvalidTransactionId;
4937 49 : rdt_data->table_sync_wait_time = 0;
4938 49 : }
4939 :
4940 : /*
4941 : * Adjust the interval for advancing non-removable transaction IDs.
4942 : *
4943 : * If there is no activity on the node or retention has been stopped, we
4944 : * progressively double the interval used to advance non-removable transaction
4945 : * ID. This helps conserve CPU and network resources when there's little benefit
4946 : * to frequent updates.
4947 : *
4948 : * The interval is capped by the lowest of the following:
4949 : * - wal_receiver_status_interval (if set and retention is active),
4950 : * - a default maximum of 3 minutes,
4951 : * - max_retention_duration (if retention is active).
4952 : *
4953 : * This ensures the interval never exceeds the retention boundary, even if other
4954 : * limits are higher. Once activity resumes on the node and the retention is
4955 : * active, the interval is reset to lesser of 100ms and max_retention_duration,
4956 : * allowing timely advancement of non-removable transaction ID.
4957 : *
4958 : * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
4959 : * consider the other interval or a separate GUC if the need arises.
4960 : */
4961 : static void
4962 233 : adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
4963 : {
4964 233 : if (rdt_data->xid_advance_interval && !new_xid_found)
4965 0 : {
4966 0 : int max_interval = wal_receiver_status_interval
4967 0 : ? wal_receiver_status_interval * 1000
4968 0 : : MAX_XID_ADVANCE_INTERVAL;
4969 :
4970 : /*
4971 : * No new transaction ID has been assigned since the last check, so
4972 : * double the interval, but not beyond the maximum allowable value.
4973 : */
4974 0 : rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
4975 : max_interval);
4976 : }
4977 233 : else if (rdt_data->xid_advance_interval &&
4978 0 : !MySubscription->retentionactive)
4979 : {
4980 : /*
4981 : * Retention has been stopped, so double the interval-capped at a
4982 : * maximum of 3 minutes. The wal_receiver_status_interval is
4983 : * intentionally not used as a upper bound, since the likelihood of
4984 : * retention resuming is lower than that of general activity resuming.
4985 : */
4986 0 : rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
4987 : MAX_XID_ADVANCE_INTERVAL);
4988 : }
4989 : else
4990 : {
4991 : /*
4992 : * A new transaction ID was found or the interval is not yet
4993 : * initialized, so set the interval to the minimum value.
4994 : */
4995 233 : rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL;
4996 : }
4997 :
4998 : /*
4999 : * Ensure the wait time remains within the maximum retention time limit
5000 : * when retention is active.
5001 : */
5002 233 : if (MySubscription->retentionactive)
5003 232 : rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval,
5004 : MySubscription->maxretention);
5005 233 : }
5006 :
5007 : /*
5008 : * Exit routine for apply workers due to subscription parameter changes.
5009 : */
5010 : static void
5011 48 : apply_worker_exit(void)
5012 : {
5013 48 : if (am_parallel_apply_worker())
5014 : {
5015 : /*
5016 : * Don't stop the parallel apply worker as the leader will detect the
5017 : * subscription parameter change and restart logical replication later
5018 : * anyway. This also prevents the leader from reporting errors when
5019 : * trying to communicate with a stopped parallel apply worker, which
5020 : * would accidentally disable subscriptions if disable_on_error was
5021 : * set.
5022 : */
5023 0 : return;
5024 : }
5025 :
5026 : /*
5027 : * Reset the last-start time for this apply worker so that the launcher
5028 : * will restart it without waiting for wal_retrieve_retry_interval if the
5029 : * subscription is still active, and so that we won't leak that hash table
5030 : * entry if it isn't.
5031 : */
5032 48 : if (am_leader_apply_worker())
5033 48 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
5034 :
5035 48 : proc_exit(0);
5036 : }
5037 :
5038 : /*
5039 : * Reread subscription info if needed.
5040 : *
5041 : * For significant changes, we react by exiting the current process; a new
5042 : * one will be launched afterwards if needed.
5043 : */
5044 : void
5045 10327 : maybe_reread_subscription(void)
5046 : {
5047 : Subscription *newsub;
5048 10327 : bool started_tx = false;
5049 :
5050 : /* When cache state is valid there is nothing to do here. */
5051 10327 : if (MySubscriptionValid)
5052 10238 : return;
5053 :
5054 : /* This function might be called inside or outside of transaction. */
5055 89 : if (!IsTransactionState())
5056 : {
5057 84 : StartTransactionCommand();
5058 84 : started_tx = true;
5059 : }
5060 :
5061 89 : newsub = GetSubscription(MyLogicalRepWorker->subid, true, true);
5062 :
5063 89 : if (newsub)
5064 : {
5065 89 : MemoryContextSetParent(newsub->cxt, ApplyContext);
5066 : }
5067 : else
5068 : {
5069 : /*
5070 : * Exit if the subscription was removed. This normally should not
5071 : * happen as the worker gets killed during DROP SUBSCRIPTION.
5072 : */
5073 0 : ereport(LOG,
5074 : (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
5075 : MySubscription->name)));
5076 :
5077 : /* Ensure we remove no-longer-useful entry for worker's start time */
5078 0 : if (am_leader_apply_worker())
5079 0 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
5080 :
5081 0 : proc_exit(0);
5082 : }
5083 :
5084 : /* Exit if the subscription was disabled. */
5085 89 : if (!newsub->enabled)
5086 : {
5087 14 : ereport(LOG,
5088 : (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
5089 : MySubscription->name)));
5090 :
5091 14 : apply_worker_exit();
5092 : }
5093 :
5094 : /* !slotname should never happen when enabled is true. */
5095 : Assert(newsub->slotname);
5096 :
5097 : /* two-phase cannot be altered while the worker is running */
5098 : Assert(newsub->twophasestate == MySubscription->twophasestate);
5099 :
5100 : /*
5101 : * Exit if any parameter that affects the remote connection was changed.
5102 : * The launcher will start a new worker but note that the parallel apply
5103 : * worker won't restart if the streaming option's value is changed from
5104 : * 'parallel' to any other value or the server decides not to stream the
5105 : * in-progress transaction.
5106 : */
5107 75 : if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
5108 71 : strcmp(newsub->name, MySubscription->name) != 0 ||
5109 70 : strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
5110 70 : newsub->binary != MySubscription->binary ||
5111 64 : newsub->stream != MySubscription->stream ||
5112 59 : newsub->passwordrequired != MySubscription->passwordrequired ||
5113 59 : strcmp(newsub->origin, MySubscription->origin) != 0 ||
5114 57 : newsub->owner != MySubscription->owner ||
5115 56 : !equal(newsub->publications, MySubscription->publications))
5116 : {
5117 29 : if (am_parallel_apply_worker())
5118 0 : ereport(LOG,
5119 : (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
5120 : MySubscription->name)));
5121 : else
5122 29 : ereport(LOG,
5123 : (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
5124 : MySubscription->name)));
5125 :
5126 29 : apply_worker_exit();
5127 : }
5128 :
5129 : /*
5130 : * Exit if the subscription owner's superuser privileges have been
5131 : * revoked.
5132 : */
5133 46 : if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
5134 : {
5135 4 : if (am_parallel_apply_worker())
5136 0 : ereport(LOG,
5137 : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
5138 : MySubscription->name));
5139 : else
5140 4 : ereport(LOG,
5141 : errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
5142 : MySubscription->name));
5143 :
5144 4 : apply_worker_exit();
5145 : }
5146 :
5147 : /* Check for other changes that should never happen too. */
5148 42 : if (newsub->dbid != MySubscription->dbid)
5149 : {
5150 0 : elog(ERROR, "subscription %u changed unexpectedly",
5151 : MyLogicalRepWorker->subid);
5152 : }
5153 :
5154 : /* Clean old subscription info and switch to new one. */
5155 42 : MemoryContextDelete(MySubscription->cxt);
5156 42 : MySubscription = newsub;
5157 :
5158 : /* Change synchronous commit according to the user's wishes */
5159 42 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
5160 : PGC_BACKEND, PGC_S_OVERRIDE);
5161 :
5162 : /* Change wal_receiver_timeout according to the user's wishes */
5163 42 : set_wal_receiver_timeout();
5164 :
5165 42 : if (started_tx)
5166 40 : CommitTransactionCommand();
5167 :
5168 42 : MySubscriptionValid = true;
5169 : }
5170 :
5171 : /*
5172 : * Change wal_receiver_timeout to MySubscription->walrcvtimeout.
5173 : */
5174 : static void
5175 602 : set_wal_receiver_timeout(void)
5176 : {
5177 : bool parsed;
5178 : int val;
5179 602 : int prev_timeout = wal_receiver_timeout;
5180 :
5181 : /*
5182 : * Set the wal_receiver_timeout GUC to MySubscription->walrcvtimeout,
5183 : * which comes from the subscription's wal_receiver_timeout option. If the
5184 : * value is -1, reset the GUC to its default, meaning it will inherit from
5185 : * the server config, command line, or role/database settings.
5186 : */
5187 602 : parsed = parse_int(MySubscription->walrcvtimeout, &val, 0, NULL);
5188 602 : if (parsed && val == -1)
5189 602 : SetConfigOption("wal_receiver_timeout", NULL,
5190 : PGC_BACKEND, PGC_S_SESSION);
5191 : else
5192 0 : SetConfigOption("wal_receiver_timeout", MySubscription->walrcvtimeout,
5193 : PGC_BACKEND, PGC_S_SESSION);
5194 :
5195 : /*
5196 : * Log the wal_receiver_timeout setting (in milliseconds) as a debug
5197 : * message when it changes, to verify it was set correctly.
5198 : */
5199 602 : if (prev_timeout != wal_receiver_timeout)
5200 0 : elog(DEBUG1, "logical replication worker for subscription \"%s\" wal_receiver_timeout: %d ms",
5201 : MySubscription->name, wal_receiver_timeout);
5202 602 : }
5203 :
5204 : /*
5205 : * Callback from subscription syscache invalidation. Also needed for server or
5206 : * user mapping invalidation, which can change the connection information for
5207 : * subscriptions that connect using a server object.
5208 : */
5209 : static void
5210 93 : subscription_change_cb(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
5211 : {
5212 93 : MySubscriptionValid = false;
5213 93 : }
5214 :
5215 : /*
5216 : * subxact_info_write
5217 : * Store information about subxacts for a toplevel transaction.
5218 : *
5219 : * For each subxact we store offset of its first change in the main file.
5220 : * The file is always over-written as a whole.
5221 : *
5222 : * XXX We should only store subxacts that were not aborted yet.
5223 : */
5224 : static void
5225 372 : subxact_info_write(Oid subid, TransactionId xid)
5226 : {
5227 : char path[MAXPGPATH];
5228 : Size len;
5229 : BufFile *fd;
5230 :
5231 : Assert(TransactionIdIsValid(xid));
5232 :
5233 : /* construct the subxact filename */
5234 372 : subxact_filename(path, subid, xid);
5235 :
5236 : /* Delete the subxacts file, if exists. */
5237 372 : if (subxact_data.nsubxacts == 0)
5238 : {
5239 290 : cleanup_subxact_info();
5240 290 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
5241 :
5242 290 : return;
5243 : }
5244 :
5245 : /*
5246 : * Create the subxact file if it not already created, otherwise open the
5247 : * existing file.
5248 : */
5249 82 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR,
5250 : true);
5251 82 : if (fd == NULL)
5252 8 : fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path);
5253 :
5254 82 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
5255 :
5256 : /* Write the subxact count and subxact info */
5257 82 : BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
5258 82 : BufFileWrite(fd, subxact_data.subxacts, len);
5259 :
5260 82 : BufFileClose(fd);
5261 :
5262 : /* free the memory allocated for subxact info */
5263 82 : cleanup_subxact_info();
5264 : }
5265 :
5266 : /*
5267 : * subxact_info_read
5268 : * Restore information about subxacts of a streamed transaction.
5269 : *
5270 : * Read information about subxacts into the structure subxact_data that can be
5271 : * used later.
5272 : */
5273 : static void
5274 344 : subxact_info_read(Oid subid, TransactionId xid)
5275 : {
5276 : char path[MAXPGPATH];
5277 : Size len;
5278 : BufFile *fd;
5279 : MemoryContext oldctx;
5280 :
5281 : Assert(!subxact_data.subxacts);
5282 : Assert(subxact_data.nsubxacts == 0);
5283 : Assert(subxact_data.nsubxacts_max == 0);
5284 :
5285 : /*
5286 : * If the subxact file doesn't exist that means we don't have any subxact
5287 : * info.
5288 : */
5289 344 : subxact_filename(path, subid, xid);
5290 344 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
5291 : true);
5292 344 : if (fd == NULL)
5293 265 : return;
5294 :
5295 : /* read number of subxact items */
5296 79 : BufFileReadExact(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
5297 :
5298 79 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
5299 :
5300 : /* we keep the maximum as a power of 2 */
5301 79 : subxact_data.nsubxacts_max = 1 << pg_ceil_log2_32(subxact_data.nsubxacts);
5302 :
5303 : /*
5304 : * Allocate subxact information in the logical streaming context. We need
5305 : * this information during the complete stream so that we can add the sub
5306 : * transaction info to this. On stream stop we will flush this information
5307 : * to the subxact file and reset the logical streaming context.
5308 : */
5309 79 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
5310 79 : subxact_data.subxacts = palloc_array(SubXactInfo,
5311 : subxact_data.nsubxacts_max);
5312 79 : MemoryContextSwitchTo(oldctx);
5313 :
5314 79 : if (len > 0)
5315 79 : BufFileReadExact(fd, subxact_data.subxacts, len);
5316 :
5317 79 : BufFileClose(fd);
5318 : }
5319 :
5320 : /*
5321 : * subxact_info_add
5322 : * Add information about a subxact (offset in the main file).
5323 : */
5324 : static void
5325 102512 : subxact_info_add(TransactionId xid)
5326 : {
5327 102512 : SubXactInfo *subxacts = subxact_data.subxacts;
5328 : int64 i;
5329 :
5330 : /* We must have a valid top level stream xid and a stream fd. */
5331 : Assert(TransactionIdIsValid(stream_xid));
5332 : Assert(stream_fd != NULL);
5333 :
5334 : /*
5335 : * If the XID matches the toplevel transaction, we don't want to add it.
5336 : */
5337 102512 : if (stream_xid == xid)
5338 92388 : return;
5339 :
5340 : /*
5341 : * In most cases we're checking the same subxact as we've already seen in
5342 : * the last call, so make sure to ignore it (this change comes later).
5343 : */
5344 10124 : if (subxact_data.subxact_last == xid)
5345 10048 : return;
5346 :
5347 : /* OK, remember we're processing this XID. */
5348 76 : subxact_data.subxact_last = xid;
5349 :
5350 : /*
5351 : * Check if the transaction is already present in the array of subxact. We
5352 : * intentionally scan the array from the tail, because we're likely adding
5353 : * a change for the most recent subtransactions.
5354 : *
5355 : * XXX Can we rely on the subxact XIDs arriving in sorted order? That
5356 : * would allow us to use binary search here.
5357 : */
5358 95 : for (i = subxact_data.nsubxacts; i > 0; i--)
5359 : {
5360 : /* found, so we're done */
5361 76 : if (subxacts[i - 1].xid == xid)
5362 57 : return;
5363 : }
5364 :
5365 : /* This is a new subxact, so we need to add it to the array. */
5366 19 : if (subxact_data.nsubxacts == 0)
5367 : {
5368 : MemoryContext oldctx;
5369 :
5370 8 : subxact_data.nsubxacts_max = 128;
5371 :
5372 : /*
5373 : * Allocate this memory for subxacts in per-stream context, see
5374 : * subxact_info_read.
5375 : */
5376 8 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
5377 8 : subxacts = palloc_array(SubXactInfo, subxact_data.nsubxacts_max);
5378 8 : MemoryContextSwitchTo(oldctx);
5379 : }
5380 11 : else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
5381 : {
5382 10 : subxact_data.nsubxacts_max *= 2;
5383 10 : subxacts = repalloc_array(subxacts, SubXactInfo,
5384 : subxact_data.nsubxacts_max);
5385 : }
5386 :
5387 19 : subxacts[subxact_data.nsubxacts].xid = xid;
5388 :
5389 : /*
5390 : * Get the current offset of the stream file and store it as offset of
5391 : * this subxact.
5392 : */
5393 19 : BufFileTell(stream_fd,
5394 19 : &subxacts[subxact_data.nsubxacts].fileno,
5395 19 : &subxacts[subxact_data.nsubxacts].offset);
5396 :
5397 19 : subxact_data.nsubxacts++;
5398 19 : subxact_data.subxacts = subxacts;
5399 : }
5400 :
5401 : /* format filename for file containing the info about subxacts */
5402 : static inline void
5403 747 : subxact_filename(char *path, Oid subid, TransactionId xid)
5404 : {
5405 747 : snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
5406 747 : }
5407 :
5408 : /* format filename for file containing serialized changes */
5409 : static inline void
5410 438 : changes_filename(char *path, Oid subid, TransactionId xid)
5411 : {
5412 438 : snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
5413 438 : }
5414 :
5415 : /*
5416 : * stream_cleanup_files
5417 : * Cleanup files for a subscription / toplevel transaction.
5418 : *
5419 : * Remove files with serialized changes and subxact info for a particular
5420 : * toplevel transaction. Each subscription has a separate set of files
5421 : * for any toplevel transaction.
5422 : */
5423 : void
5424 31 : stream_cleanup_files(Oid subid, TransactionId xid)
5425 : {
5426 : char path[MAXPGPATH];
5427 :
5428 : /* Delete the changes file. */
5429 31 : changes_filename(path, subid, xid);
5430 31 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false);
5431 :
5432 : /* Delete the subxact file, if it exists. */
5433 31 : subxact_filename(path, subid, xid);
5434 31 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
5435 31 : }
5436 :
5437 : /*
5438 : * stream_open_file
5439 : * Open a file that we'll use to serialize changes for a toplevel
5440 : * transaction.
5441 : *
5442 : * Open a file for streamed changes from a toplevel transaction identified
5443 : * by stream_xid (global variable). If it's the first chunk of streamed
5444 : * changes for this transaction, create the buffile, otherwise open the
5445 : * previously created file.
5446 : */
5447 : static void
5448 363 : stream_open_file(Oid subid, TransactionId xid, bool first_segment)
5449 : {
5450 : char path[MAXPGPATH];
5451 : MemoryContext oldcxt;
5452 :
5453 : Assert(OidIsValid(subid));
5454 : Assert(TransactionIdIsValid(xid));
5455 : Assert(stream_fd == NULL);
5456 :
5457 :
5458 363 : changes_filename(path, subid, xid);
5459 363 : elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
5460 :
5461 : /*
5462 : * Create/open the buffiles under the logical streaming context so that we
5463 : * have those files until stream stop.
5464 : */
5465 363 : oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
5466 :
5467 : /*
5468 : * If this is the first streamed segment, create the changes file.
5469 : * Otherwise, just open the file for writing, in append mode.
5470 : */
5471 363 : if (first_segment)
5472 32 : stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset,
5473 : path);
5474 : else
5475 : {
5476 : /*
5477 : * Open the file and seek to the end of the file because we always
5478 : * append the changes file.
5479 : */
5480 331 : stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset,
5481 : path, O_RDWR, false);
5482 331 : BufFileSeek(stream_fd, 0, 0, SEEK_END);
5483 : }
5484 :
5485 363 : MemoryContextSwitchTo(oldcxt);
5486 363 : }
5487 :
5488 : /*
5489 : * stream_close_file
5490 : * Close the currently open file with streamed changes.
5491 : */
5492 : static void
5493 393 : stream_close_file(void)
5494 : {
5495 : Assert(stream_fd != NULL);
5496 :
5497 393 : BufFileClose(stream_fd);
5498 :
5499 393 : stream_fd = NULL;
5500 393 : }
5501 :
5502 : /*
5503 : * stream_write_change
5504 : * Serialize a change to a file for the current toplevel transaction.
5505 : *
5506 : * The change is serialized in a simple format, with length (not including
5507 : * the length), action code (identifying the message type) and message
5508 : * contents (without the subxact TransactionId value).
5509 : */
5510 : static void
5511 107553 : stream_write_change(char action, StringInfo s)
5512 : {
5513 : int len;
5514 :
5515 : Assert(stream_fd != NULL);
5516 :
5517 : /* total on-disk size, including the action type character */
5518 107553 : len = (s->len - s->cursor) + sizeof(char);
5519 :
5520 : /* first write the size */
5521 107553 : BufFileWrite(stream_fd, &len, sizeof(len));
5522 :
5523 : /* then the action */
5524 107553 : BufFileWrite(stream_fd, &action, sizeof(action));
5525 :
5526 : /* and finally the remaining part of the buffer (after the XID) */
5527 107553 : len = (s->len - s->cursor);
5528 :
5529 107553 : BufFileWrite(stream_fd, &s->data[s->cursor], len);
5530 107553 : }
5531 :
5532 : /*
5533 : * stream_open_and_write_change
5534 : * Serialize a message to a file for the given transaction.
5535 : *
5536 : * This function is similar to stream_write_change except that it will open the
5537 : * target file if not already before writing the message and close the file at
5538 : * the end.
5539 : */
5540 : static void
5541 5 : stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
5542 : {
5543 : Assert(!in_streamed_transaction);
5544 :
5545 5 : if (!stream_fd)
5546 5 : stream_start_internal(xid, false);
5547 :
5548 5 : stream_write_change(action, s);
5549 5 : stream_stop_internal(xid);
5550 5 : }
5551 :
5552 : /*
5553 : * Sets streaming options including replication slot name and origin start
5554 : * position. Workers need these options for logical replication.
5555 : */
5556 : void
5557 472 : set_stream_options(WalRcvStreamOptions *options,
5558 : char *slotname,
5559 : XLogRecPtr *origin_startpos)
5560 : {
5561 : int server_version;
5562 :
5563 472 : options->logical = true;
5564 472 : options->startpoint = *origin_startpos;
5565 472 : options->slotname = slotname;
5566 :
5567 472 : server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
5568 472 : options->proto.logical.proto_version =
5569 472 : server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
5570 : server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
5571 : server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
5572 : LOGICALREP_PROTO_VERSION_NUM;
5573 :
5574 472 : options->proto.logical.publication_names = MySubscription->publications;
5575 472 : options->proto.logical.binary = MySubscription->binary;
5576 :
5577 : /*
5578 : * Assign the appropriate option value for streaming option according to
5579 : * the 'streaming' mode and the publisher's ability to support that mode.
5580 : */
5581 472 : if (server_version >= 160000 &&
5582 472 : MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
5583 : {
5584 438 : options->proto.logical.streaming_str = "parallel";
5585 438 : MyLogicalRepWorker->parallel_apply = true;
5586 : }
5587 34 : else if (server_version >= 140000 &&
5588 34 : MySubscription->stream != LOGICALREP_STREAM_OFF)
5589 : {
5590 26 : options->proto.logical.streaming_str = "on";
5591 26 : MyLogicalRepWorker->parallel_apply = false;
5592 : }
5593 : else
5594 : {
5595 8 : options->proto.logical.streaming_str = NULL;
5596 8 : MyLogicalRepWorker->parallel_apply = false;
5597 : }
5598 :
5599 472 : options->proto.logical.twophase = false;
5600 472 : options->proto.logical.origin = pstrdup(MySubscription->origin);
5601 472 : }
5602 :
5603 : /*
5604 : * Cleanup the memory for subxacts and reset the related variables.
5605 : */
5606 : static inline void
5607 376 : cleanup_subxact_info(void)
5608 : {
5609 376 : if (subxact_data.subxacts)
5610 87 : pfree(subxact_data.subxacts);
5611 :
5612 376 : subxact_data.subxacts = NULL;
5613 376 : subxact_data.subxact_last = InvalidTransactionId;
5614 376 : subxact_data.nsubxacts = 0;
5615 376 : subxact_data.nsubxacts_max = 0;
5616 376 : }
5617 :
5618 : /*
5619 : * Common function to run the apply loop with error handling. Disable the
5620 : * subscription, if necessary.
5621 : *
5622 : * Note that we don't handle FATAL errors which are probably because
5623 : * of system resource error and are not repeatable.
5624 : */
5625 : void
5626 472 : start_apply(XLogRecPtr origin_startpos)
5627 : {
5628 472 : PG_TRY();
5629 : {
5630 472 : LogicalRepApplyLoop(origin_startpos);
5631 : }
5632 117 : PG_CATCH();
5633 : {
5634 : /*
5635 : * Reset the origin state to prevent the advancement of origin
5636 : * progress if we fail to apply. Otherwise, this will result in
5637 : * transaction loss as that transaction won't be sent again by the
5638 : * server.
5639 : */
5640 117 : replorigin_xact_clear(true);
5641 :
5642 117 : if (MySubscription->disableonerr)
5643 3 : DisableSubscriptionAndExit();
5644 : else
5645 : {
5646 : /*
5647 : * Report the worker failed while applying changes. Abort the
5648 : * current transaction so that the stats message is sent in an
5649 : * idle state.
5650 : */
5651 114 : AbortOutOfAnyTransaction();
5652 114 : pgstat_report_subscription_error(MySubscription->oid);
5653 :
5654 114 : PG_RE_THROW();
5655 : }
5656 : }
5657 0 : PG_END_TRY();
5658 0 : }
5659 :
5660 : /*
5661 : * Runs the leader apply worker.
5662 : *
5663 : * It sets up replication origin, streaming options and then starts streaming.
5664 : */
5665 : static void
5666 324 : run_apply_worker(void)
5667 : {
5668 : char originname[NAMEDATALEN];
5669 324 : XLogRecPtr origin_startpos = InvalidXLogRecPtr;
5670 324 : char *slotname = NULL;
5671 : WalRcvStreamOptions options;
5672 : ReplOriginId originid;
5673 : TimeLineID startpointTLI;
5674 : char *err;
5675 : bool must_use_password;
5676 :
5677 324 : slotname = MySubscription->slotname;
5678 :
5679 : /*
5680 : * This shouldn't happen if the subscription is enabled, but guard against
5681 : * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
5682 : * slot is NULL.)
5683 : */
5684 324 : if (!slotname)
5685 0 : ereport(ERROR,
5686 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5687 : errmsg("subscription has no replication slot set")));
5688 :
5689 : /* Setup replication origin tracking. */
5690 324 : ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
5691 : originname, sizeof(originname));
5692 324 : StartTransactionCommand();
5693 324 : originid = replorigin_by_name(originname, true);
5694 324 : if (!OidIsValid(originid))
5695 0 : originid = replorigin_create(originname);
5696 324 : replorigin_session_setup(originid, 0);
5697 324 : replorigin_xact_state.origin = originid;
5698 324 : origin_startpos = replorigin_session_get_progress(false);
5699 324 : CommitTransactionCommand();
5700 :
5701 : /* Is the use of a password mandatory? */
5702 621 : must_use_password = MySubscription->passwordrequired &&
5703 297 : !MySubscription->ownersuperuser;
5704 :
5705 324 : LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
5706 : true, must_use_password,
5707 : MySubscription->name, &err);
5708 :
5709 312 : if (LogRepWorkerWalRcvConn == NULL)
5710 39 : ereport(ERROR,
5711 : (errcode(ERRCODE_CONNECTION_FAILURE),
5712 : errmsg("apply worker for subscription \"%s\" could not connect to the publisher: %s",
5713 : MySubscription->name, err)));
5714 :
5715 : /*
5716 : * We don't really use the output identify_system for anything but it does
5717 : * some initializations on the upstream so let's still call it.
5718 : */
5719 273 : (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
5720 :
5721 273 : set_apply_error_context_origin(originname);
5722 :
5723 273 : set_stream_options(&options, slotname, &origin_startpos);
5724 :
5725 : /*
5726 : * Even when the two_phase mode is requested by the user, it remains as
5727 : * the tri-state PENDING until all tablesyncs have reached READY state.
5728 : * Only then, can it become ENABLED.
5729 : *
5730 : * Note: If the subscription has no tables then leave the state as
5731 : * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
5732 : * work.
5733 : */
5734 289 : if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
5735 16 : AllTablesyncsReady())
5736 : {
5737 : /* Start streaming with two_phase enabled */
5738 9 : options.proto.logical.twophase = true;
5739 9 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
5740 :
5741 9 : StartTransactionCommand();
5742 :
5743 : /*
5744 : * Updating pg_subscription might involve TOAST table access, so
5745 : * ensure we have a valid snapshot.
5746 : */
5747 9 : PushActiveSnapshot(GetTransactionSnapshot());
5748 :
5749 9 : UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
5750 9 : MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
5751 9 : PopActiveSnapshot();
5752 9 : CommitTransactionCommand();
5753 : }
5754 : else
5755 : {
5756 264 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
5757 : }
5758 :
5759 273 : ereport(DEBUG1,
5760 : (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
5761 : MySubscription->name,
5762 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
5763 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
5764 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
5765 : "?")));
5766 :
5767 : /* Run the main loop. */
5768 273 : start_apply(origin_startpos);
5769 0 : }
5770 :
5771 : /*
5772 : * Common initialization for leader apply worker, parallel apply worker,
5773 : * tablesync worker and sequencesync worker.
5774 : *
5775 : * Initialize the database connection, in-memory subscription and necessary
5776 : * config options.
5777 : */
5778 : void
5779 633 : InitializeLogRepWorker(void)
5780 : {
5781 : /* Run as replica session replication role. */
5782 633 : SetConfigOption("session_replication_role", "replica",
5783 : PGC_SUSET, PGC_S_OVERRIDE);
5784 :
5785 : /* Connect to our database. */
5786 633 : BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
5787 633 : MyLogicalRepWorker->userid,
5788 : 0);
5789 :
5790 : /*
5791 : * Set always-secure search path, so malicious users can't redirect user
5792 : * code (e.g. pg_index.indexprs).
5793 : */
5794 630 : SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
5795 :
5796 630 : ApplyContext = AllocSetContextCreate(TopMemoryContext,
5797 : "ApplyContext",
5798 : ALLOCSET_DEFAULT_SIZES);
5799 :
5800 630 : StartTransactionCommand();
5801 :
5802 : /*
5803 : * Lock the subscription to prevent it from being concurrently dropped,
5804 : * then re-verify its existence. After the initialization, the worker will
5805 : * be terminated gracefully if the subscription is dropped.
5806 : */
5807 630 : LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, 0,
5808 : AccessShareLock);
5809 :
5810 628 : MySubscription = GetSubscription(MyLogicalRepWorker->subid, true, true);
5811 :
5812 628 : if (MySubscription)
5813 : {
5814 560 : MemoryContextSetParent(MySubscription->cxt, ApplyContext);
5815 : }
5816 : else
5817 : {
5818 68 : ereport(LOG,
5819 : (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
5820 : MyLogicalRepWorker->subid)));
5821 :
5822 : /* Ensure we remove no-longer-useful entry for worker's start time */
5823 68 : if (am_leader_apply_worker())
5824 68 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
5825 :
5826 68 : proc_exit(0);
5827 : }
5828 :
5829 560 : MySubscriptionValid = true;
5830 :
5831 560 : if (!MySubscription->enabled)
5832 : {
5833 0 : ereport(LOG,
5834 : (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
5835 : MySubscription->name)));
5836 :
5837 0 : apply_worker_exit();
5838 : }
5839 :
5840 : /*
5841 : * Restart the worker if retain_dead_tuples was enabled during startup.
5842 : *
5843 : * At this point, the replication slot used for conflict detection might
5844 : * not exist yet, or could be dropped soon if the launcher perceives
5845 : * retain_dead_tuples as disabled. To avoid unnecessary tracking of
5846 : * oldest_nonremovable_xid when the slot is absent or at risk of being
5847 : * dropped, a restart is initiated.
5848 : *
5849 : * The oldest_nonremovable_xid should be initialized only when the
5850 : * subscription's retention is active before launching the worker. See
5851 : * logicalrep_worker_launch.
5852 : */
5853 560 : if (am_leader_apply_worker() &&
5854 324 : MySubscription->retaindeadtuples &&
5855 15 : MySubscription->retentionactive &&
5856 15 : !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid))
5857 : {
5858 0 : ereport(LOG,
5859 : errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup",
5860 : MySubscription->name, "retain_dead_tuples"));
5861 :
5862 0 : apply_worker_exit();
5863 : }
5864 :
5865 : /* Setup synchronous commit according to the user's wishes */
5866 560 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
5867 : PGC_BACKEND, PGC_S_OVERRIDE);
5868 :
5869 : /* Change wal_receiver_timeout according to the user's wishes */
5870 560 : set_wal_receiver_timeout();
5871 :
5872 : /*
5873 : * Keep us informed about subscription or role changes. Note that the
5874 : * role's superuser privilege can be revoked.
5875 : */
5876 560 : CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
5877 : subscription_change_cb,
5878 : (Datum) 0);
5879 : /* Changes to foreign servers may affect subscriptions using SERVER. */
5880 560 : CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
5881 : subscription_change_cb,
5882 : (Datum) 0);
5883 : /* Changes to user mappings may affect subscriptions using SERVER. */
5884 560 : CacheRegisterSyscacheCallback(USERMAPPINGOID,
5885 : subscription_change_cb,
5886 : (Datum) 0);
5887 :
5888 : /*
5889 : * Changes to FDW connection_function may affect subscriptions using
5890 : * SERVER.
5891 : */
5892 560 : CacheRegisterSyscacheCallback(FOREIGNDATAWRAPPEROID,
5893 : subscription_change_cb,
5894 : (Datum) 0);
5895 :
5896 560 : CacheRegisterSyscacheCallback(AUTHOID,
5897 : subscription_change_cb,
5898 : (Datum) 0);
5899 :
5900 560 : if (am_tablesync_worker())
5901 214 : ereport(LOG,
5902 : errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
5903 : MySubscription->name,
5904 : get_rel_name(MyLogicalRepWorker->relid)));
5905 346 : else if (am_sequencesync_worker())
5906 9 : ereport(LOG,
5907 : errmsg("logical replication sequence synchronization worker for subscription \"%s\" has started",
5908 : MySubscription->name));
5909 : else
5910 337 : ereport(LOG,
5911 : errmsg("logical replication apply worker for subscription \"%s\" has started",
5912 : MySubscription->name));
5913 :
5914 560 : CommitTransactionCommand();
5915 :
5916 : /*
5917 : * Register a callback to reset the origin state before aborting any
5918 : * pending transaction during shutdown (see ShutdownPostgres()). This will
5919 : * avoid origin advancement for an incomplete transaction which could
5920 : * otherwise lead to its loss as such a transaction won't be sent by the
5921 : * server again.
5922 : *
5923 : * Note that even a LOG or DEBUG statement placed after setting the origin
5924 : * state may process a shutdown signal before committing the current apply
5925 : * operation. So, it is important to register such a callback here.
5926 : *
5927 : * Register this callback here to ensure that all types of logical
5928 : * replication workers that set up origins and apply remote transactions
5929 : * are protected.
5930 : */
5931 560 : before_shmem_exit(on_exit_clear_xact_state, (Datum) 0);
5932 560 : }
5933 :
5934 : /*
5935 : * Callback on exit to clear transaction-level replication origin state.
5936 : */
5937 : static void
5938 560 : on_exit_clear_xact_state(int code, Datum arg)
5939 : {
5940 560 : replorigin_xact_clear(true);
5941 560 : }
5942 :
5943 : /*
5944 : * Common function to setup the leader apply, tablesync and sequencesync worker.
5945 : */
5946 : void
5947 620 : SetupApplyOrSyncWorker(int worker_slot)
5948 : {
5949 : /* Attach to slot */
5950 620 : logicalrep_worker_attach(worker_slot);
5951 :
5952 : Assert(am_tablesync_worker() || am_sequencesync_worker() || am_leader_apply_worker());
5953 :
5954 : /* Setup signal handling */
5955 620 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
5956 620 : BackgroundWorkerUnblockSignals();
5957 :
5958 : /*
5959 : * We don't currently need any ResourceOwner in a walreceiver process, but
5960 : * if we did, we could call CreateAuxProcessResourceOwner here.
5961 : */
5962 :
5963 : /* Initialise stats to a sanish value */
5964 620 : MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
5965 620 : MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
5966 :
5967 : /* Load the libpq-specific functions */
5968 620 : load_file("libpqwalreceiver", false);
5969 :
5970 620 : InitializeLogRepWorker();
5971 :
5972 : /* Connect to the origin and start the replication. */
5973 547 : elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
5974 : MySubscription->conninfo);
5975 :
5976 : /*
5977 : * Setup callback for syscache so that we know when something changes in
5978 : * the subscription relation state.
5979 : */
5980 547 : CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
5981 : InvalidateSyncingRelStates,
5982 : (Datum) 0);
5983 547 : }
5984 :
5985 : /* Logical Replication Apply worker entry point */
5986 : void
5987 397 : ApplyWorkerMain(Datum main_arg)
5988 : {
5989 397 : int worker_slot = DatumGetInt32(main_arg);
5990 :
5991 397 : InitializingApplyWorker = true;
5992 :
5993 397 : SetupApplyOrSyncWorker(worker_slot);
5994 :
5995 324 : InitializingApplyWorker = false;
5996 :
5997 324 : run_apply_worker();
5998 :
5999 0 : proc_exit(0);
6000 : }
6001 :
6002 : /*
6003 : * After error recovery, disable the subscription in a new transaction
6004 : * and exit cleanly.
6005 : */
6006 : void
6007 4 : DisableSubscriptionAndExit(void)
6008 : {
6009 : /*
6010 : * Emit the error message, and recover from the error state to an idle
6011 : * state
6012 : */
6013 4 : HOLD_INTERRUPTS();
6014 :
6015 4 : EmitErrorReport();
6016 4 : AbortOutOfAnyTransaction();
6017 4 : FlushErrorState();
6018 :
6019 4 : RESUME_INTERRUPTS();
6020 :
6021 : /*
6022 : * Report the worker failed during sequence synchronization, table
6023 : * synchronization, or apply.
6024 : */
6025 4 : pgstat_report_subscription_error(MyLogicalRepWorker->subid);
6026 :
6027 : /* Disable the subscription */
6028 4 : StartTransactionCommand();
6029 :
6030 : /*
6031 : * Updating pg_subscription might involve TOAST table access, so ensure we
6032 : * have a valid snapshot.
6033 : */
6034 4 : PushActiveSnapshot(GetTransactionSnapshot());
6035 :
6036 4 : DisableSubscription(MySubscription->oid);
6037 4 : PopActiveSnapshot();
6038 4 : CommitTransactionCommand();
6039 :
6040 : /* Ensure we remove no-longer-useful entry for worker's start time */
6041 4 : if (am_leader_apply_worker())
6042 3 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
6043 :
6044 : /* Notify the subscription has been disabled and exit */
6045 4 : ereport(LOG,
6046 : errmsg("subscription \"%s\" has been disabled because of an error",
6047 : MySubscription->name));
6048 :
6049 : /*
6050 : * Skip the track_commit_timestamp check when disabling the worker due to
6051 : * an error, as verifying commit timestamps is unnecessary in this
6052 : * context.
6053 : */
6054 4 : CheckSubDeadTupleRetention(false, true, WARNING,
6055 4 : MySubscription->retaindeadtuples,
6056 4 : MySubscription->retentionactive, false);
6057 :
6058 4 : proc_exit(0);
6059 : }
6060 :
6061 : /*
6062 : * Is current process a logical replication worker?
6063 : */
6064 : bool
6065 2760 : IsLogicalWorker(void)
6066 : {
6067 2760 : return MyLogicalRepWorker != NULL;
6068 : }
6069 :
6070 : /*
6071 : * Is current process a logical replication parallel apply worker?
6072 : */
6073 : bool
6074 2023 : IsLogicalParallelApplyWorker(void)
6075 : {
6076 2023 : return IsLogicalWorker() && am_parallel_apply_worker();
6077 : }
6078 :
6079 : /*
6080 : * Start skipping changes of the transaction if the given LSN matches the
6081 : * LSN specified by subscription's skiplsn.
6082 : */
6083 : static void
6084 586 : maybe_start_skipping_changes(XLogRecPtr finish_lsn)
6085 : {
6086 : Assert(!is_skipping_changes());
6087 : Assert(!in_remote_transaction);
6088 : Assert(!in_streamed_transaction);
6089 :
6090 : /*
6091 : * Quick return if it's not requested to skip this transaction. This
6092 : * function is called for every remote transaction and we assume that
6093 : * skipping the transaction is not used often.
6094 : */
6095 586 : if (likely(!XLogRecPtrIsValid(MySubscription->skiplsn) ||
6096 : MySubscription->skiplsn != finish_lsn))
6097 583 : return;
6098 :
6099 : /* Start skipping all changes of this transaction */
6100 3 : skip_xact_finish_lsn = finish_lsn;
6101 :
6102 3 : ereport(LOG,
6103 : errmsg("logical replication starts skipping transaction at LSN %X/%08X",
6104 : LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
6105 : }
6106 :
6107 : /*
6108 : * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
6109 : */
6110 : static void
6111 30 : stop_skipping_changes(void)
6112 : {
6113 30 : if (!is_skipping_changes())
6114 27 : return;
6115 :
6116 3 : ereport(LOG,
6117 : errmsg("logical replication completed skipping transaction at LSN %X/%08X",
6118 : LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
6119 :
6120 : /* Stop skipping changes */
6121 3 : skip_xact_finish_lsn = InvalidXLogRecPtr;
6122 : }
6123 :
6124 : /*
6125 : * Clear subskiplsn of pg_subscription catalog.
6126 : *
6127 : * finish_lsn is the transaction's finish LSN that is used to check if the
6128 : * subskiplsn matches it. If not matched, we raise a warning when clearing the
6129 : * subskiplsn in order to inform users for cases e.g., where the user mistakenly
6130 : * specified the wrong subskiplsn.
6131 : */
6132 : static void
6133 555 : clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
6134 : {
6135 : Relation rel;
6136 : Form_pg_subscription subform;
6137 : HeapTuple tup;
6138 555 : XLogRecPtr myskiplsn = MySubscription->skiplsn;
6139 555 : bool started_tx = false;
6140 :
6141 555 : if (likely(!XLogRecPtrIsValid(myskiplsn)) || am_parallel_apply_worker())
6142 552 : return;
6143 :
6144 3 : if (!IsTransactionState())
6145 : {
6146 1 : StartTransactionCommand();
6147 1 : started_tx = true;
6148 : }
6149 :
6150 : /*
6151 : * Updating pg_subscription might involve TOAST table access, so ensure we
6152 : * have a valid snapshot.
6153 : */
6154 3 : PushActiveSnapshot(GetTransactionSnapshot());
6155 :
6156 : /*
6157 : * Protect subskiplsn of pg_subscription from being concurrently updated
6158 : * while clearing it.
6159 : */
6160 3 : LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
6161 : AccessShareLock);
6162 :
6163 3 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
6164 :
6165 : /* Fetch the existing tuple. */
6166 3 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
6167 : ObjectIdGetDatum(MySubscription->oid));
6168 :
6169 3 : if (!HeapTupleIsValid(tup))
6170 0 : elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
6171 :
6172 3 : subform = (Form_pg_subscription) GETSTRUCT(tup);
6173 :
6174 : /*
6175 : * Clear the subskiplsn. If the user has already changed subskiplsn before
6176 : * clearing it we don't update the catalog and the replication origin
6177 : * state won't get advanced. So in the worst case, if the server crashes
6178 : * before sending an acknowledgment of the flush position the transaction
6179 : * will be sent again and the user needs to set subskiplsn again. We can
6180 : * reduce the possibility by logging a replication origin WAL record to
6181 : * advance the origin LSN instead but there is no way to advance the
6182 : * origin timestamp and it doesn't seem to be worth doing anything about
6183 : * it since it's a very rare case.
6184 : */
6185 3 : if (subform->subskiplsn == myskiplsn)
6186 : {
6187 : bool nulls[Natts_pg_subscription];
6188 : bool replaces[Natts_pg_subscription];
6189 : Datum values[Natts_pg_subscription];
6190 :
6191 3 : memset(values, 0, sizeof(values));
6192 3 : memset(nulls, false, sizeof(nulls));
6193 3 : memset(replaces, false, sizeof(replaces));
6194 :
6195 : /* reset subskiplsn */
6196 3 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
6197 3 : replaces[Anum_pg_subscription_subskiplsn - 1] = true;
6198 :
6199 3 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
6200 : replaces);
6201 3 : CatalogTupleUpdate(rel, &tup->t_self, tup);
6202 :
6203 3 : if (myskiplsn != finish_lsn)
6204 0 : ereport(WARNING,
6205 : errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
6206 : errdetail("Remote transaction's finish WAL location (LSN) %X/%08X did not match skip-LSN %X/%08X.",
6207 : LSN_FORMAT_ARGS(finish_lsn),
6208 : LSN_FORMAT_ARGS(myskiplsn)));
6209 : }
6210 :
6211 3 : heap_freetuple(tup);
6212 3 : table_close(rel, NoLock);
6213 :
6214 3 : PopActiveSnapshot();
6215 :
6216 3 : if (started_tx)
6217 1 : CommitTransactionCommand();
6218 : }
6219 :
6220 : /* Error callback to give more context info about the change being applied */
6221 : void
6222 15684 : apply_error_callback(void *arg)
6223 : {
6224 15684 : ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
6225 :
6226 15684 : if (apply_error_callback_arg.command == 0)
6227 15240 : return;
6228 :
6229 : Assert(errarg->origin_name);
6230 :
6231 444 : if (errarg->rel == NULL)
6232 : {
6233 345 : if (!TransactionIdIsValid(errarg->remote_xid))
6234 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
6235 : errarg->origin_name,
6236 : logicalrep_message_type(errarg->command));
6237 345 : else if (!XLogRecPtrIsValid(errarg->finish_lsn))
6238 264 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
6239 : errarg->origin_name,
6240 : logicalrep_message_type(errarg->command),
6241 : errarg->remote_xid);
6242 : else
6243 162 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%08X",
6244 : errarg->origin_name,
6245 : logicalrep_message_type(errarg->command),
6246 : errarg->remote_xid,
6247 81 : LSN_FORMAT_ARGS(errarg->finish_lsn));
6248 : }
6249 : else
6250 : {
6251 99 : if (errarg->remote_attnum < 0)
6252 : {
6253 99 : if (!XLogRecPtrIsValid(errarg->finish_lsn))
6254 6 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
6255 : errarg->origin_name,
6256 : logicalrep_message_type(errarg->command),
6257 3 : errarg->rel->remoterel.nspname,
6258 3 : errarg->rel->remoterel.relname,
6259 : errarg->remote_xid);
6260 : else
6261 192 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%08X",
6262 : errarg->origin_name,
6263 : logicalrep_message_type(errarg->command),
6264 96 : errarg->rel->remoterel.nspname,
6265 96 : errarg->rel->remoterel.relname,
6266 : errarg->remote_xid,
6267 96 : LSN_FORMAT_ARGS(errarg->finish_lsn));
6268 : }
6269 : else
6270 : {
6271 0 : if (!XLogRecPtrIsValid(errarg->finish_lsn))
6272 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
6273 : errarg->origin_name,
6274 : logicalrep_message_type(errarg->command),
6275 0 : errarg->rel->remoterel.nspname,
6276 0 : errarg->rel->remoterel.relname,
6277 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
6278 : errarg->remote_xid);
6279 : else
6280 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%08X",
6281 : errarg->origin_name,
6282 : logicalrep_message_type(errarg->command),
6283 0 : errarg->rel->remoterel.nspname,
6284 0 : errarg->rel->remoterel.relname,
6285 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
6286 : errarg->remote_xid,
6287 0 : LSN_FORMAT_ARGS(errarg->finish_lsn));
6288 : }
6289 : }
6290 : }
6291 :
6292 : /* Set transaction information of apply error callback */
6293 : static inline void
6294 3001 : set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
6295 : {
6296 3001 : apply_error_callback_arg.remote_xid = xid;
6297 3001 : apply_error_callback_arg.finish_lsn = lsn;
6298 3001 : }
6299 :
6300 : /* Reset all information of apply error callback */
6301 : static inline void
6302 1454 : reset_apply_error_context_info(void)
6303 : {
6304 1454 : apply_error_callback_arg.command = 0;
6305 1454 : apply_error_callback_arg.rel = NULL;
6306 1454 : apply_error_callback_arg.remote_attnum = -1;
6307 1454 : set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
6308 1454 : }
6309 :
6310 : /*
6311 : * Request wakeup of the workers for the given subscription OID
6312 : * at commit of the current transaction.
6313 : *
6314 : * This is used to ensure that the workers process assorted changes
6315 : * as soon as possible.
6316 : */
6317 : void
6318 278 : LogicalRepWorkersWakeupAtCommit(Oid subid)
6319 : {
6320 : MemoryContext oldcxt;
6321 :
6322 278 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
6323 278 : on_commit_wakeup_workers_subids =
6324 278 : list_append_unique_oid(on_commit_wakeup_workers_subids, subid);
6325 278 : MemoryContextSwitchTo(oldcxt);
6326 278 : }
6327 :
6328 : /*
6329 : * Wake up the workers of any subscriptions that were changed in this xact.
6330 : */
6331 : void
6332 626968 : AtEOXact_LogicalRepWorkers(bool isCommit)
6333 : {
6334 626968 : if (isCommit && on_commit_wakeup_workers_subids != NIL)
6335 : {
6336 : ListCell *lc;
6337 :
6338 272 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
6339 544 : foreach(lc, on_commit_wakeup_workers_subids)
6340 : {
6341 272 : Oid subid = lfirst_oid(lc);
6342 : List *workers;
6343 : ListCell *lc2;
6344 :
6345 272 : workers = logicalrep_workers_find(subid, true, false);
6346 348 : foreach(lc2, workers)
6347 : {
6348 76 : LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
6349 :
6350 76 : logicalrep_worker_wakeup_ptr(worker);
6351 : }
6352 : }
6353 272 : LWLockRelease(LogicalRepWorkerLock);
6354 : }
6355 :
6356 : /* The List storage will be reclaimed automatically in xact cleanup. */
6357 626968 : on_commit_wakeup_workers_subids = NIL;
6358 626968 : }
6359 :
6360 : /*
6361 : * Allocate the origin name in long-lived context for error context message.
6362 : */
6363 : void
6364 485 : set_apply_error_context_origin(char *originname)
6365 : {
6366 485 : apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
6367 : originname);
6368 485 : }
6369 :
6370 : /*
6371 : * Return the action to be taken for the given transaction. See
6372 : * TransApplyAction for information on each of the actions.
6373 : *
6374 : * *winfo is assigned to the destination parallel worker info when the leader
6375 : * apply worker has to pass all the transaction's changes to the parallel
6376 : * apply worker.
6377 : */
6378 : static TransApplyAction
6379 347042 : get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
6380 : {
6381 347042 : *winfo = NULL;
6382 :
6383 347042 : if (am_parallel_apply_worker())
6384 : {
6385 68927 : return TRANS_PARALLEL_APPLY;
6386 : }
6387 :
6388 : /*
6389 : * If we are processing this transaction using a parallel apply worker
6390 : * then either we send the changes to the parallel worker or if the worker
6391 : * is busy then serialize the changes to the file which will later be
6392 : * processed by the parallel worker.
6393 : */
6394 278115 : *winfo = pa_find_worker(xid);
6395 :
6396 278115 : if (*winfo && (*winfo)->serialize_changes)
6397 : {
6398 5037 : return TRANS_LEADER_PARTIAL_SERIALIZE;
6399 : }
6400 273078 : else if (*winfo)
6401 : {
6402 68911 : return TRANS_LEADER_SEND_TO_PARALLEL;
6403 : }
6404 :
6405 : /*
6406 : * If there is no parallel worker involved to process this transaction
6407 : * then we either directly apply the change or serialize it to a file
6408 : * which will later be applied when the transaction finish message is
6409 : * processed.
6410 : */
6411 204167 : else if (in_streamed_transaction)
6412 : {
6413 103198 : return TRANS_LEADER_SERIALIZE;
6414 : }
6415 : else
6416 : {
6417 100969 : return TRANS_LEADER_APPLY;
6418 : }
6419 : }
|