Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * worker.c
3 : * PostgreSQL logical replication worker (apply)
4 : *
5 : * Copyright (c) 2016-2026, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/worker.c
9 : *
10 : * NOTES
11 : * This file contains the worker which applies logical changes as they come
12 : * from remote logical replication stream.
13 : *
14 : * The main worker (apply) is started by logical replication worker
15 : * launcher for every enabled subscription in a database. It uses
16 : * walsender protocol to communicate with publisher.
17 : *
18 : * This module includes server facing code and shares libpqwalreceiver
19 : * module with walreceiver for providing the libpq specific functionality.
20 : *
21 : *
22 : * STREAMED TRANSACTIONS
23 : * ---------------------
24 : * Streamed transactions (large transactions exceeding a memory limit on the
25 : * upstream) are applied using one of two approaches:
26 : *
27 : * 1) Write to temporary files and apply when the final commit arrives
28 : *
29 : * This approach is used when the user has set the subscription's streaming
30 : * option as on.
31 : *
32 : * Unlike the regular (non-streamed) case, handling streamed transactions has
33 : * to handle aborts of both the toplevel transaction and subtransactions. This
34 : * is achieved by tracking offsets for subtransactions, which is then used
35 : * to truncate the file with serialized changes.
36 : *
37 : * The files are placed in tmp file directory by default, and the filenames
38 : * include both the XID of the toplevel transaction and OID of the
39 : * subscription. This is necessary so that different workers processing a
40 : * remote transaction with the same XID doesn't interfere.
41 : *
42 : * We use BufFiles instead of using normal temporary files because (a) the
43 : * BufFile infrastructure supports temporary files that exceed the OS file size
44 : * limit, (b) provides a way for automatic clean up on the error and (c) provides
45 : * a way to survive these files across local transactions and allow to open and
46 : * close at stream start and close. We decided to use FileSet
47 : * infrastructure as without that it deletes the files on the closure of the
48 : * file and if we decide to keep stream files open across the start/stop stream
49 : * then it will consume a lot of memory (more than 8K for each BufFile and
50 : * there could be multiple such BufFiles as the subscriber could receive
51 : * multiple start/stop streams for different transactions before getting the
52 : * commit). Moreover, if we don't use FileSet then we also need to invent
53 : * a new way to pass filenames to BufFile APIs so that we are allowed to open
54 : * the file we desired across multiple stream-open calls for the same
55 : * transaction.
56 : *
57 : * 2) Parallel apply workers.
58 : *
59 : * This approach is used when the user has set the subscription's streaming
60 : * option as parallel. See logical/applyparallelworker.c for information about
61 : * this approach.
62 : *
63 : * TWO_PHASE TRANSACTIONS
64 : * ----------------------
65 : * Two phase transactions are replayed at prepare and then committed or
66 : * rolled back at commit prepared and rollback prepared respectively. It is
67 : * possible to have a prepared transaction that arrives at the apply worker
68 : * when the tablesync is busy doing the initial copy. In this case, the apply
69 : * worker skips all the prepared operations [e.g. inserts] while the tablesync
70 : * is still busy (see the condition of should_apply_changes_for_rel). The
71 : * tablesync worker might not get such a prepared transaction because say it
72 : * was prior to the initial consistent point but might have got some later
73 : * commits. Now, the tablesync worker will exit without doing anything for the
74 : * prepared transaction skipped by the apply worker as the sync location for it
75 : * will be already ahead of the apply worker's current location. This would lead
76 : * to an "empty prepare", because later when the apply worker does the commit
77 : * prepare, there is nothing in it (the inserts were skipped earlier).
78 : *
79 : * To avoid this, and similar prepare confusions the subscription's two_phase
80 : * commit is enabled only after the initial sync is over. The two_phase option
81 : * has been implemented as a tri-state with values DISABLED, PENDING, and
82 : * ENABLED.
83 : *
84 : * Even if the user specifies they want a subscription with two_phase = on,
85 : * internally it will start with a tri-state of PENDING which only becomes
86 : * ENABLED after all tablesync initializations are completed - i.e. when all
87 : * tablesync workers have reached their READY state. In other words, the value
88 : * PENDING is only a temporary state for subscription start-up.
89 : *
90 : * Until the two_phase is properly available (ENABLED) the subscription will
91 : * behave as if two_phase = off. When the apply worker detects that all
92 : * tablesyncs have become READY (while the tri-state was PENDING) it will
93 : * restart the apply worker process. This happens in
94 : * ProcessSyncingTablesForApply.
95 : *
96 : * When the (re-started) apply worker finds that all tablesyncs are READY for a
97 : * two_phase tri-state of PENDING it start streaming messages with the
98 : * two_phase option which in turn enables the decoding of two-phase commits at
99 : * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
100 : * Now, it is possible that during the time we have not enabled two_phase, the
101 : * publisher (replication server) would have skipped some prepares but we
102 : * ensure that such prepares are sent along with commit prepare, see
103 : * ReorderBufferFinishPrepared.
104 : *
105 : * If the subscription has no tables then a two_phase tri-state PENDING is
106 : * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
107 : * PUBLICATION which might otherwise be disallowed (see below).
108 : *
109 : * If ever a user needs to be aware of the tri-state value, they can fetch it
110 : * from the pg_subscription catalog (see column subtwophasestate).
111 : *
112 : * Finally, to avoid problems mentioned in previous paragraphs from any
113 : * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
114 : * to 'off' and then again back to 'on') there is a restriction for
115 : * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
116 : * the two_phase tri-state is ENABLED, except when copy_data = false.
117 : *
118 : * We can get prepare of the same GID more than once for the genuine cases
119 : * where we have defined multiple subscriptions for publications on the same
120 : * server and prepared transaction has operations on tables subscribed to those
121 : * subscriptions. For such cases, if we use the GID sent by publisher one of
122 : * the prepares will be successful and others will fail, in which case the
123 : * server will send them again. Now, this can lead to a deadlock if user has
124 : * set synchronous_standby_names for all the subscriptions on subscriber. To
125 : * avoid such deadlocks, we generate a unique GID (consisting of the
126 : * subscription oid and the xid of the prepared transaction) for each prepare
127 : * transaction on the subscriber.
128 : *
129 : * FAILOVER
130 : * ----------------------
131 : * The logical slot on the primary can be synced to the standby by specifying
132 : * failover = true when creating the subscription. Enabling failover allows us
133 : * to smoothly transition to the promoted standby, ensuring that we can
134 : * subscribe to the new primary without losing any data.
135 : *
136 : * RETAIN DEAD TUPLES
137 : * ----------------------
138 : * Each apply worker that enabled retain_dead_tuples option maintains a
139 : * non-removable transaction ID (oldest_nonremovable_xid) in shared memory to
140 : * prevent dead rows from being removed prematurely when the apply worker still
141 : * needs them to detect update_deleted conflicts. Additionally, this helps to
142 : * retain the required commit_ts module information, which further helps to
143 : * detect update_origin_differs and delete_origin_differs conflicts reliably, as
144 : * otherwise, vacuum freeze could remove the required information.
145 : *
146 : * The logical replication launcher manages an internal replication slot named
147 : * "pg_conflict_detection". It asynchronously aggregates the non-removable
148 : * transaction ID from all apply workers to determine the appropriate xmin for
149 : * the slot, thereby retaining necessary tuples.
150 : *
151 : * The non-removable transaction ID in the apply worker is advanced to the
152 : * oldest running transaction ID once all concurrent transactions on the
153 : * publisher have been applied and flushed locally. The process involves:
154 : *
155 : * - RDT_GET_CANDIDATE_XID:
156 : * Call GetOldestActiveTransactionId() to take oldestRunningXid as the
157 : * candidate xid.
158 : *
159 : * - RDT_REQUEST_PUBLISHER_STATUS:
160 : * Send a message to the walsender requesting the publisher status, which
161 : * includes the latest WAL write position and information about transactions
162 : * that are in the commit phase.
163 : *
164 : * - RDT_WAIT_FOR_PUBLISHER_STATUS:
165 : * Wait for the status from the walsender. After receiving the first status,
166 : * do not proceed if there are concurrent remote transactions that are still
167 : * in the commit phase. These transactions might have been assigned an
168 : * earlier commit timestamp but have not yet written the commit WAL record.
169 : * Continue to request the publisher status (RDT_REQUEST_PUBLISHER_STATUS)
170 : * until all these transactions have completed.
171 : *
172 : * - RDT_WAIT_FOR_LOCAL_FLUSH:
173 : * Advance the non-removable transaction ID if the current flush location has
174 : * reached or surpassed the last received WAL position.
175 : *
176 : * - RDT_STOP_CONFLICT_INFO_RETENTION:
177 : * This phase is required only when max_retention_duration is defined. We
178 : * enter this phase if the wait time in either the
179 : * RDT_WAIT_FOR_PUBLISHER_STATUS or RDT_WAIT_FOR_LOCAL_FLUSH phase exceeds
180 : * configured max_retention_duration. In this phase,
181 : * pg_subscription.subretentionactive is updated to false within a new
182 : * transaction, and oldest_nonremovable_xid is set to InvalidTransactionId.
183 : *
184 : * - RDT_RESUME_CONFLICT_INFO_RETENTION:
185 : * This phase is required only when max_retention_duration is defined. We
186 : * enter this phase if the retention was previously stopped, and the time
187 : * required to advance the non-removable transaction ID in the
188 : * RDT_WAIT_FOR_LOCAL_FLUSH phase has decreased to within acceptable limits
189 : * (or if max_retention_duration is set to 0). During this phase,
190 : * pg_subscription.subretentionactive is updated to true within a new
191 : * transaction, and the worker will be restarted.
192 : *
193 : * The overall state progression is: GET_CANDIDATE_XID ->
194 : * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
195 : * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
196 : * WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID.
197 : *
198 : * Retaining the dead tuples for this period is sufficient for ensuring
199 : * eventual consistency using last-update-wins strategy, as dead tuples are
200 : * useful for detecting conflicts only during the application of concurrent
201 : * transactions from remote nodes. After applying and flushing all remote
202 : * transactions that occurred concurrently with the tuple DELETE, any
203 : * subsequent UPDATE from a remote node should have a later timestamp. In such
204 : * cases, it is acceptable to detect an update_missing scenario and convert the
205 : * UPDATE to an INSERT when applying it. But, for concurrent remote
206 : * transactions with earlier timestamps than the DELETE, detecting
207 : * update_deleted is necessary, as the UPDATEs in remote transactions should be
208 : * ignored if their timestamp is earlier than that of the dead tuples.
209 : *
210 : * Note that advancing the non-removable transaction ID is not supported if the
211 : * publisher is also a physical standby. This is because the logical walsender
212 : * on the standby can only get the WAL replay position but there may be more
213 : * WALs that are being replicated from the primary and those WALs could have
214 : * earlier commit timestamp.
215 : *
216 : * Similarly, when the publisher has subscribed to another publisher,
217 : * information necessary for conflict detection cannot be retained for
218 : * changes from origins other than the publisher. This is because publisher
219 : * lacks the information on concurrent transactions of other publishers to
220 : * which it subscribes. As the information on concurrent transactions is
221 : * unavailable beyond subscriber's immediate publishers, the non-removable
222 : * transaction ID might be advanced prematurely before changes from other
223 : * origins have been fully applied.
224 : *
225 : * XXX Retaining information for changes from other origins might be possible
226 : * by requesting the subscription on that origin to enable retain_dead_tuples
227 : * and fetching the conflict detection slot.xmin along with the publisher's
228 : * status. In the RDT_WAIT_FOR_PUBLISHER_STATUS phase, the apply worker could
229 : * wait for the remote slot's xmin to reach the oldest active transaction ID,
230 : * ensuring that all transactions from other origins have been applied on the
231 : * publisher, thereby getting the latest WAL position that includes all
232 : * concurrent changes. However, this approach may impact performance, so it
233 : * might not worth the effort.
234 : *
235 : * XXX It seems feasible to get the latest commit's WAL location from the
236 : * publisher and wait till that is applied. However, we can't do that
237 : * because commit timestamps can regress as a commit with a later LSN is not
238 : * guaranteed to have a later timestamp than those with earlier LSNs. Having
239 : * said that, even if that is possible, it won't improve performance much as
240 : * the apply always lag and moves slowly as compared with the transactions
241 : * on the publisher.
242 : *-------------------------------------------------------------------------
243 : */
244 :
245 : #include "postgres.h"
246 :
247 : #include <sys/stat.h>
248 : #include <unistd.h>
249 :
250 : #include "access/genam.h"
251 : #include "access/commit_ts.h"
252 : #include "access/table.h"
253 : #include "access/tableam.h"
254 : #include "access/tupconvert.h"
255 : #include "access/twophase.h"
256 : #include "access/xact.h"
257 : #include "catalog/indexing.h"
258 : #include "catalog/pg_inherits.h"
259 : #include "catalog/pg_subscription.h"
260 : #include "catalog/pg_subscription_rel.h"
261 : #include "commands/subscriptioncmds.h"
262 : #include "commands/tablecmds.h"
263 : #include "commands/trigger.h"
264 : #include "executor/executor.h"
265 : #include "executor/execPartition.h"
266 : #include "libpq/pqformat.h"
267 : #include "miscadmin.h"
268 : #include "optimizer/optimizer.h"
269 : #include "parser/parse_relation.h"
270 : #include "pgstat.h"
271 : #include "port/pg_bitutils.h"
272 : #include "postmaster/bgworker.h"
273 : #include "postmaster/interrupt.h"
274 : #include "postmaster/walwriter.h"
275 : #include "replication/conflict.h"
276 : #include "replication/logicallauncher.h"
277 : #include "replication/logicalproto.h"
278 : #include "replication/logicalrelation.h"
279 : #include "replication/logicalworker.h"
280 : #include "replication/origin.h"
281 : #include "replication/slot.h"
282 : #include "replication/walreceiver.h"
283 : #include "replication/worker_internal.h"
284 : #include "rewrite/rewriteHandler.h"
285 : #include "storage/buffile.h"
286 : #include "storage/ipc.h"
287 : #include "storage/latch.h"
288 : #include "storage/lmgr.h"
289 : #include "storage/procarray.h"
290 : #include "tcop/tcopprot.h"
291 : #include "utils/acl.h"
292 : #include "utils/guc.h"
293 : #include "utils/inval.h"
294 : #include "utils/lsyscache.h"
295 : #include "utils/memutils.h"
296 : #include "utils/pg_lsn.h"
297 : #include "utils/rel.h"
298 : #include "utils/rls.h"
299 : #include "utils/snapmgr.h"
300 : #include "utils/syscache.h"
301 : #include "utils/usercontext.h"
302 : #include "utils/wait_event.h"
303 :
304 : #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
305 :
306 : typedef struct FlushPosition
307 : {
308 : dlist_node node;
309 : XLogRecPtr local_end;
310 : XLogRecPtr remote_end;
311 : } FlushPosition;
312 :
313 : static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
314 :
315 : typedef struct ApplyExecutionData
316 : {
317 : EState *estate; /* executor state, used to track resources */
318 :
319 : LogicalRepRelMapEntry *targetRel; /* replication target rel */
320 : ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
321 :
322 : /* These fields are used when the target relation is partitioned: */
323 : ModifyTableState *mtstate; /* dummy ModifyTable state */
324 : PartitionTupleRouting *proute; /* partition routing info */
325 : } ApplyExecutionData;
326 :
327 : /* Struct for saving and restoring apply errcontext information */
328 : typedef struct ApplyErrorCallbackArg
329 : {
330 : LogicalRepMsgType command; /* 0 if invalid */
331 : LogicalRepRelMapEntry *rel;
332 :
333 : /* Remote node information */
334 : int remote_attnum; /* -1 if invalid */
335 : TransactionId remote_xid;
336 : XLogRecPtr finish_lsn;
337 : char *origin_name;
338 : } ApplyErrorCallbackArg;
339 :
340 : /*
341 : * The action to be taken for the changes in the transaction.
342 : *
343 : * TRANS_LEADER_APPLY:
344 : * This action means that we are in the leader apply worker or table sync
345 : * worker. The changes of the transaction are either directly applied or
346 : * are read from temporary files (for streaming transactions) and then
347 : * applied by the worker.
348 : *
349 : * TRANS_LEADER_SERIALIZE:
350 : * This action means that we are in the leader apply worker or table sync
351 : * worker. Changes are written to temporary files and then applied when the
352 : * final commit arrives.
353 : *
354 : * TRANS_LEADER_SEND_TO_PARALLEL:
355 : * This action means that we are in the leader apply worker and need to send
356 : * the changes to the parallel apply worker.
357 : *
358 : * TRANS_LEADER_PARTIAL_SERIALIZE:
359 : * This action means that we are in the leader apply worker and have sent some
360 : * changes directly to the parallel apply worker and the remaining changes are
361 : * serialized to a file, due to timeout while sending data. The parallel apply
362 : * worker will apply these serialized changes when the final commit arrives.
363 : *
364 : * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
365 : * serializing changes, the leader worker also needs to serialize the
366 : * STREAM_XXX message to a file, and wait for the parallel apply worker to
367 : * finish the transaction when processing the transaction finish command. So
368 : * this new action was introduced to keep the code and logic clear.
369 : *
370 : * TRANS_PARALLEL_APPLY:
371 : * This action means that we are in the parallel apply worker and changes of
372 : * the transaction are applied directly by the worker.
373 : */
374 : typedef enum
375 : {
376 : /* The action for non-streaming transactions. */
377 : TRANS_LEADER_APPLY,
378 :
379 : /* Actions for streaming transactions. */
380 : TRANS_LEADER_SERIALIZE,
381 : TRANS_LEADER_SEND_TO_PARALLEL,
382 : TRANS_LEADER_PARTIAL_SERIALIZE,
383 : TRANS_PARALLEL_APPLY,
384 : } TransApplyAction;
385 :
386 : /*
387 : * The phases involved in advancing the non-removable transaction ID.
388 : *
389 : * See comments atop worker.c for details of the transition between these
390 : * phases.
391 : */
392 : typedef enum
393 : {
394 : RDT_GET_CANDIDATE_XID,
395 : RDT_REQUEST_PUBLISHER_STATUS,
396 : RDT_WAIT_FOR_PUBLISHER_STATUS,
397 : RDT_WAIT_FOR_LOCAL_FLUSH,
398 : RDT_STOP_CONFLICT_INFO_RETENTION,
399 : RDT_RESUME_CONFLICT_INFO_RETENTION,
400 : } RetainDeadTuplesPhase;
401 :
402 : /*
403 : * Critical information for managing phase transitions within the
404 : * RetainDeadTuplesPhase.
405 : */
406 : typedef struct RetainDeadTuplesData
407 : {
408 : RetainDeadTuplesPhase phase; /* current phase */
409 : XLogRecPtr remote_lsn; /* WAL write position on the publisher */
410 :
411 : /*
412 : * Oldest transaction ID that was in the commit phase on the publisher.
413 : * Use FullTransactionId to prevent issues with transaction ID wraparound,
414 : * where a new remote_oldestxid could falsely appear to originate from the
415 : * past and block advancement.
416 : */
417 : FullTransactionId remote_oldestxid;
418 :
419 : /*
420 : * Next transaction ID to be assigned on the publisher. Use
421 : * FullTransactionId for consistency and to allow straightforward
422 : * comparisons with remote_oldestxid.
423 : */
424 : FullTransactionId remote_nextxid;
425 :
426 : TimestampTz reply_time; /* when the publisher responds with status */
427 :
428 : /*
429 : * Publisher transaction ID that must be awaited to complete before
430 : * entering the final phase (RDT_WAIT_FOR_LOCAL_FLUSH). Use
431 : * FullTransactionId for the same reason as remote_nextxid.
432 : */
433 : FullTransactionId remote_wait_for;
434 :
435 : TransactionId candidate_xid; /* candidate for the non-removable
436 : * transaction ID */
437 : TimestampTz flushpos_update_time; /* when the remote flush position was
438 : * updated in final phase
439 : * (RDT_WAIT_FOR_LOCAL_FLUSH) */
440 :
441 : long table_sync_wait_time; /* time spent waiting for table sync
442 : * to finish */
443 :
444 : /*
445 : * The following fields are used to determine the timing for the next
446 : * round of transaction ID advancement.
447 : */
448 : TimestampTz last_recv_time; /* when the last message was received */
449 : TimestampTz candidate_xid_time; /* when the candidate_xid is decided */
450 : int xid_advance_interval; /* how much time (ms) to wait before
451 : * attempting to advance the
452 : * non-removable transaction ID */
453 : } RetainDeadTuplesData;
454 :
455 : /*
456 : * The minimum (100ms) and maximum (3 minutes) intervals for advancing
457 : * non-removable transaction IDs. The maximum interval is a bit arbitrary but
458 : * is sufficient to not cause any undue network traffic.
459 : */
460 : #define MIN_XID_ADVANCE_INTERVAL 100
461 : #define MAX_XID_ADVANCE_INTERVAL 180000
462 :
463 : /* errcontext tracker */
464 : static ApplyErrorCallbackArg apply_error_callback_arg =
465 : {
466 : .command = 0,
467 : .rel = NULL,
468 : .remote_attnum = -1,
469 : .remote_xid = InvalidTransactionId,
470 : .finish_lsn = InvalidXLogRecPtr,
471 : .origin_name = NULL,
472 : };
473 :
474 : ErrorContextCallback *apply_error_context_stack = NULL;
475 :
476 : MemoryContext ApplyMessageContext = NULL;
477 : MemoryContext ApplyContext = NULL;
478 :
479 : /* per stream context for streaming transactions */
480 : static MemoryContext LogicalStreamingContext = NULL;
481 :
482 : WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
483 :
484 : Subscription *MySubscription = NULL;
485 : static bool MySubscriptionValid = false;
486 :
487 : static List *on_commit_wakeup_workers_subids = NIL;
488 :
489 : bool in_remote_transaction = false;
490 : static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
491 :
492 : /* fields valid only when processing streamed transaction */
493 : static bool in_streamed_transaction = false;
494 :
495 : static TransactionId stream_xid = InvalidTransactionId;
496 :
497 : /*
498 : * The number of changes applied by parallel apply worker during one streaming
499 : * block.
500 : */
501 : static uint32 parallel_stream_nchanges = 0;
502 :
503 : /* Are we initializing an apply worker? */
504 : bool InitializingApplyWorker = false;
505 :
506 : /*
507 : * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
508 : * the subscription if the remote transaction's finish LSN matches the subskiplsn.
509 : * Once we start skipping changes, we don't stop it until we skip all changes of
510 : * the transaction even if pg_subscription is updated and MySubscription->skiplsn
511 : * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
512 : * we don't skip receiving and spooling the changes since we decide whether or not
513 : * to skip applying the changes when starting to apply changes. The subskiplsn is
514 : * cleared after successfully skipping the transaction or applying non-empty
515 : * transaction. The latter prevents the mistakenly specified subskiplsn from
516 : * being left. Note that we cannot skip the streaming transactions when using
517 : * parallel apply workers because we cannot get the finish LSN before applying
518 : * the changes. So, we don't start parallel apply worker when finish LSN is set
519 : * by the user.
520 : */
521 : static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
522 : #define is_skipping_changes() (unlikely(XLogRecPtrIsValid(skip_xact_finish_lsn)))
523 :
524 : /* BufFile handle of the current streaming file */
525 : static BufFile *stream_fd = NULL;
526 :
527 : /*
528 : * The remote WAL position that has been applied and flushed locally. We record
529 : * and use this information both while sending feedback to the server and
530 : * advancing oldest_nonremovable_xid.
531 : */
532 : static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
533 :
534 : typedef struct SubXactInfo
535 : {
536 : TransactionId xid; /* XID of the subxact */
537 : int fileno; /* file number in the buffile */
538 : pgoff_t offset; /* offset in the file */
539 : } SubXactInfo;
540 :
541 : /* Sub-transaction data for the current streaming transaction */
542 : typedef struct ApplySubXactData
543 : {
544 : uint32 nsubxacts; /* number of sub-transactions */
545 : uint32 nsubxacts_max; /* current capacity of subxacts */
546 : TransactionId subxact_last; /* xid of the last sub-transaction */
547 : SubXactInfo *subxacts; /* sub-xact offset in changes file */
548 : } ApplySubXactData;
549 :
550 : static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
551 :
552 : static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
553 : static inline void changes_filename(char *path, Oid subid, TransactionId xid);
554 :
555 : /*
556 : * Information about subtransactions of a given toplevel transaction.
557 : */
558 : static void subxact_info_write(Oid subid, TransactionId xid);
559 : static void subxact_info_read(Oid subid, TransactionId xid);
560 : static void subxact_info_add(TransactionId xid);
561 : static inline void cleanup_subxact_info(void);
562 :
563 : /*
564 : * Serialize and deserialize changes for a toplevel transaction.
565 : */
566 : static void stream_open_file(Oid subid, TransactionId xid,
567 : bool first_segment);
568 : static void stream_write_change(char action, StringInfo s);
569 : static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
570 : static void stream_close_file(void);
571 :
572 : static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
573 :
574 : static void maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
575 : bool status_received);
576 : static bool can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data);
577 : static void process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
578 : bool status_received);
579 : static void get_candidate_xid(RetainDeadTuplesData *rdt_data);
580 : static void request_publisher_status(RetainDeadTuplesData *rdt_data);
581 : static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
582 : bool status_received);
583 : static void wait_for_local_flush(RetainDeadTuplesData *rdt_data);
584 : static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
585 : static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
586 : static void resume_conflict_info_retention(RetainDeadTuplesData *rdt_data);
587 : static bool update_retention_status(bool active);
588 : static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data);
589 : static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data,
590 : bool new_xid_found);
591 :
592 : static void apply_worker_exit(void);
593 :
594 : static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
595 : static void apply_handle_insert_internal(ApplyExecutionData *edata,
596 : ResultRelInfo *relinfo,
597 : TupleTableSlot *remoteslot);
598 : static void apply_handle_update_internal(ApplyExecutionData *edata,
599 : ResultRelInfo *relinfo,
600 : TupleTableSlot *remoteslot,
601 : LogicalRepTupleData *newtup,
602 : Oid localindexoid);
603 : static void apply_handle_delete_internal(ApplyExecutionData *edata,
604 : ResultRelInfo *relinfo,
605 : TupleTableSlot *remoteslot,
606 : Oid localindexoid);
607 : static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
608 : LogicalRepRelation *remoterel,
609 : Oid localidxoid,
610 : TupleTableSlot *remoteslot,
611 : TupleTableSlot **localslot);
612 : static bool FindDeletedTupleInLocalRel(Relation localrel,
613 : Oid localidxoid,
614 : TupleTableSlot *remoteslot,
615 : TransactionId *delete_xid,
616 : ReplOriginId *delete_origin,
617 : TimestampTz *delete_time);
618 : static void apply_handle_tuple_routing(ApplyExecutionData *edata,
619 : TupleTableSlot *remoteslot,
620 : LogicalRepTupleData *newtup,
621 : CmdType operation);
622 :
623 : /* Functions for skipping changes */
624 : static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
625 : static void stop_skipping_changes(void);
626 : static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
627 :
628 : /* Functions for apply error callback */
629 : static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
630 : static inline void reset_apply_error_context_info(void);
631 :
632 : static TransApplyAction get_transaction_apply_action(TransactionId xid,
633 : ParallelApplyWorkerInfo **winfo);
634 :
635 : static void set_wal_receiver_timeout(void);
636 :
637 : static void on_exit_clear_xact_state(int code, Datum arg);
638 :
639 : /*
640 : * Form the origin name for the subscription.
641 : *
642 : * This is a common function for tablesync and other workers. Tablesync workers
643 : * must pass a valid relid. Other callers must pass relid = InvalidOid.
644 : *
645 : * Return the name in the supplied buffer.
646 : */
647 : void
648 1471 : ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
649 : char *originname, Size szoriginname)
650 : {
651 1471 : if (OidIsValid(relid))
652 : {
653 : /* Replication origin name for tablesync workers. */
654 809 : snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
655 : }
656 : else
657 : {
658 : /* Replication origin name for non-tablesync workers. */
659 662 : snprintf(originname, szoriginname, "pg_%u", suboid);
660 : }
661 1471 : }
662 :
663 : /*
664 : * Should this worker apply changes for given relation.
665 : *
666 : * This is mainly needed for initial relation data sync as that runs in
667 : * separate worker process running in parallel and we need some way to skip
668 : * changes coming to the leader apply worker during the sync of a table.
669 : *
670 : * Note we need to do smaller or equals comparison for SYNCDONE state because
671 : * it might hold position of end of initial slot consistent point WAL
672 : * record + 1 (ie start of next record) and next record can be COMMIT of
673 : * transaction we are now processing (which is what we set remote_final_lsn
674 : * to in apply_handle_begin).
675 : *
676 : * Note that for streaming transactions that are being applied in the parallel
677 : * apply worker, we disallow applying changes if the target table in the
678 : * subscription is not in the READY state, because we cannot decide whether to
679 : * apply the change as we won't know remote_final_lsn by that time.
680 : *
681 : * We already checked this in pa_can_start() before assigning the
682 : * streaming transaction to the parallel worker, but it also needs to be
683 : * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
684 : * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
685 : * while applying this transaction.
686 : */
687 : static bool
688 171840 : should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
689 : {
690 171840 : switch (MyLogicalRepWorker->type)
691 : {
692 3000 : case WORKERTYPE_TABLESYNC:
693 3000 : return MyLogicalRepWorker->relid == rel->localreloid;
694 :
695 68367 : case WORKERTYPE_PARALLEL_APPLY:
696 : /* We don't synchronize rel's that are in unknown state. */
697 68367 : if (rel->state != SUBREL_STATE_READY &&
698 0 : rel->state != SUBREL_STATE_UNKNOWN)
699 0 : ereport(ERROR,
700 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
701 : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
702 : MySubscription->name),
703 : errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
704 :
705 68367 : return rel->state == SUBREL_STATE_READY;
706 :
707 100473 : case WORKERTYPE_APPLY:
708 100537 : return (rel->state == SUBREL_STATE_READY ||
709 64 : (rel->state == SUBREL_STATE_SYNCDONE &&
710 14 : rel->statelsn <= remote_final_lsn));
711 :
712 0 : case WORKERTYPE_SEQUENCESYNC:
713 : /* Should never happen. */
714 0 : elog(ERROR, "sequence synchronization worker is not expected to apply changes");
715 : break;
716 :
717 0 : case WORKERTYPE_UNKNOWN:
718 : /* Should never happen. */
719 0 : elog(ERROR, "Unknown worker type");
720 : }
721 :
722 0 : return false; /* dummy for compiler */
723 : }
724 :
725 : /*
726 : * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
727 : *
728 : * Start a transaction, if this is the first step (else we keep using the
729 : * existing transaction).
730 : * Also provide a global snapshot and ensure we run in ApplyMessageContext.
731 : */
732 : static void
733 172306 : begin_replication_step(void)
734 : {
735 172306 : SetCurrentStatementStartTimestamp();
736 :
737 172306 : if (!IsTransactionState())
738 : {
739 1012 : StartTransactionCommand();
740 1012 : maybe_reread_subscription();
741 : }
742 :
743 172304 : PushActiveSnapshot(GetTransactionSnapshot());
744 :
745 172304 : MemoryContextSwitchTo(ApplyMessageContext);
746 172304 : }
747 :
748 : /*
749 : * Finish up one step of a replication transaction.
750 : * Callers of begin_replication_step() must also call this.
751 : *
752 : * We don't close out the transaction here, but we should increment
753 : * the command counter to make the effects of this step visible.
754 : */
755 : static void
756 172238 : end_replication_step(void)
757 : {
758 172238 : PopActiveSnapshot();
759 :
760 172238 : CommandCounterIncrement();
761 172238 : }
762 :
763 : /*
764 : * Handle streamed transactions for both the leader apply worker and the
765 : * parallel apply workers.
766 : *
767 : * In the streaming case (receiving a block of the streamed transaction), for
768 : * serialize mode, simply redirect it to a file for the proper toplevel
769 : * transaction, and for parallel mode, the leader apply worker will send the
770 : * changes to parallel apply workers and the parallel apply worker will define
771 : * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
772 : * messages will be applied by both leader apply worker and parallel apply
773 : * workers).
774 : *
775 : * Returns true for streamed transactions (when the change is either serialized
776 : * to file or sent to parallel apply worker), false otherwise (regular mode or
777 : * needs to be processed by parallel apply worker).
778 : *
779 : * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
780 : * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
781 : * to a parallel apply worker.
782 : */
783 : static bool
784 348219 : handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
785 : {
786 : TransactionId current_xid;
787 : ParallelApplyWorkerInfo *winfo;
788 : TransApplyAction apply_action;
789 : StringInfoData original_msg;
790 :
791 348219 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
792 :
793 : /* not in streaming mode */
794 348219 : if (apply_action == TRANS_LEADER_APPLY)
795 103913 : return false;
796 :
797 : Assert(TransactionIdIsValid(stream_xid));
798 :
799 : /*
800 : * The parallel apply worker needs the xid in this message to decide
801 : * whether to define a savepoint, so save the original message that has
802 : * not moved the cursor after the xid. We will serialize this message to a
803 : * file in PARTIAL_SERIALIZE mode.
804 : */
805 244306 : original_msg = *s;
806 :
807 : /*
808 : * We should have received XID of the subxact as the first part of the
809 : * message, so extract it.
810 : */
811 244306 : current_xid = pq_getmsgint(s, 4);
812 :
813 244306 : if (!TransactionIdIsValid(current_xid))
814 0 : ereport(ERROR,
815 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
816 : errmsg_internal("invalid transaction ID in streamed replication transaction")));
817 :
818 244306 : switch (apply_action)
819 : {
820 102513 : case TRANS_LEADER_SERIALIZE:
821 : Assert(stream_fd);
822 :
823 : /* Add the new subxact to the array (unless already there). */
824 102513 : subxact_info_add(current_xid);
825 :
826 : /* Write the change to the current file */
827 102513 : stream_write_change(action, s);
828 102513 : return true;
829 :
830 68390 : case TRANS_LEADER_SEND_TO_PARALLEL:
831 : Assert(winfo);
832 :
833 : /*
834 : * XXX The publisher side doesn't always send relation/type update
835 : * messages after the streaming transaction, so also update the
836 : * relation/type in leader apply worker. See function
837 : * cleanup_rel_sync_cache.
838 : */
839 68390 : if (pa_send_data(winfo, s->len, s->data))
840 68390 : return (action != LOGICAL_REP_MSG_RELATION &&
841 : action != LOGICAL_REP_MSG_TYPE);
842 :
843 : /*
844 : * Switch to serialize mode when we are not able to send the
845 : * change to parallel apply worker.
846 : */
847 0 : pa_switch_to_partial_serialize(winfo, false);
848 :
849 : pg_fallthrough;
850 5006 : case TRANS_LEADER_PARTIAL_SERIALIZE:
851 5006 : stream_write_change(action, &original_msg);
852 :
853 : /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
854 5006 : return (action != LOGICAL_REP_MSG_RELATION &&
855 : action != LOGICAL_REP_MSG_TYPE);
856 :
857 68397 : case TRANS_PARALLEL_APPLY:
858 68397 : parallel_stream_nchanges += 1;
859 :
860 : /* Define a savepoint for a subxact if needed. */
861 68397 : pa_start_subtrans(current_xid, stream_xid);
862 68397 : return false;
863 :
864 0 : default:
865 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
866 : return false; /* silence compiler warning */
867 : }
868 : }
869 :
870 : /*
871 : * Executor state preparation for evaluation of constraint expressions,
872 : * indexes and triggers for the specified relation.
873 : *
874 : * Note that the caller must open and close any indexes to be updated.
875 : */
876 : static ApplyExecutionData *
877 171760 : create_edata_for_relation(LogicalRepRelMapEntry *rel)
878 : {
879 : ApplyExecutionData *edata;
880 : EState *estate;
881 : RangeTblEntry *rte;
882 171760 : List *perminfos = NIL;
883 : ResultRelInfo *resultRelInfo;
884 :
885 171760 : edata = palloc0_object(ApplyExecutionData);
886 171760 : edata->targetRel = rel;
887 :
888 171760 : edata->estate = estate = CreateExecutorState();
889 :
890 171760 : rte = makeNode(RangeTblEntry);
891 171760 : rte->rtekind = RTE_RELATION;
892 171760 : rte->relid = RelationGetRelid(rel->localrel);
893 171760 : rte->relkind = rel->localrel->rd_rel->relkind;
894 171760 : rte->rellockmode = AccessShareLock;
895 :
896 171760 : addRTEPermissionInfo(&perminfos, rte);
897 :
898 171760 : ExecInitRangeTable(estate, list_make1(rte), perminfos,
899 : bms_make_singleton(1));
900 :
901 171760 : edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
902 :
903 : /*
904 : * Use Relation opened by logicalrep_rel_open() instead of opening it
905 : * again.
906 : */
907 171760 : InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
908 :
909 : /*
910 : * We put the ResultRelInfo in the es_opened_result_relations list, even
911 : * though we don't populate the es_result_relations array. That's a bit
912 : * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
913 : *
914 : * ExecOpenIndices() is not called here either, each execution path doing
915 : * an apply operation being responsible for that.
916 : */
917 171760 : estate->es_opened_result_relations =
918 171760 : lappend(estate->es_opened_result_relations, resultRelInfo);
919 :
920 171760 : estate->es_output_cid = GetCurrentCommandId(true);
921 :
922 : /* Prepare to catch AFTER triggers. */
923 171760 : AfterTriggerBeginQuery();
924 :
925 : /* other fields of edata remain NULL for now */
926 :
927 171760 : return edata;
928 : }
929 :
930 : /*
931 : * Finish any operations related to the executor state created by
932 : * create_edata_for_relation().
933 : */
934 : static void
935 171709 : finish_edata(ApplyExecutionData *edata)
936 : {
937 171709 : EState *estate = edata->estate;
938 :
939 : /* Handle any queued AFTER triggers. */
940 171709 : AfterTriggerEndQuery(estate);
941 :
942 : /* Shut down tuple routing, if any was done. */
943 171709 : if (edata->proute)
944 74 : ExecCleanupTupleRouting(edata->mtstate, edata->proute);
945 :
946 : /*
947 : * Cleanup. It might seem that we should call ExecCloseResultRelations()
948 : * here, but we intentionally don't. It would close the rel we added to
949 : * es_opened_result_relations above, which is wrong because we took no
950 : * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
951 : * any other relations opened during execution.
952 : */
953 171709 : ExecResetTupleTable(estate->es_tupleTable, false);
954 171709 : FreeExecutorState(estate);
955 171709 : pfree(edata);
956 171709 : }
957 :
958 : /*
959 : * Executes default values for columns for which we can't map to remote
960 : * relation columns.
961 : *
962 : * This allows us to support tables which have more columns on the downstream
963 : * than on the upstream.
964 : */
965 : static void
966 99480 : slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
967 : TupleTableSlot *slot)
968 : {
969 99480 : TupleDesc desc = RelationGetDescr(rel->localrel);
970 99480 : int num_phys_attrs = desc->natts;
971 : int i;
972 : int attnum,
973 99480 : num_defaults = 0;
974 : int *defmap;
975 : ExprState **defexprs;
976 : ExprContext *econtext;
977 :
978 99480 : econtext = GetPerTupleExprContext(estate);
979 :
980 : /* We got all the data via replication, no need to evaluate anything. */
981 99480 : if (num_phys_attrs == rel->remoterel.natts)
982 59335 : return;
983 :
984 40145 : defmap = palloc_array(int, num_phys_attrs);
985 40145 : defexprs = palloc_array(ExprState *, num_phys_attrs);
986 :
987 : Assert(rel->attrmap->maplen == num_phys_attrs);
988 210663 : for (attnum = 0; attnum < num_phys_attrs; attnum++)
989 : {
990 170518 : CompactAttribute *cattr = TupleDescCompactAttr(desc, attnum);
991 : Expr *defexpr;
992 :
993 170518 : if (cattr->attisdropped || cattr->attgenerated)
994 9 : continue;
995 :
996 170509 : if (rel->attrmap->attnums[attnum] >= 0)
997 92268 : continue;
998 :
999 78241 : defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
1000 :
1001 78241 : if (defexpr != NULL)
1002 : {
1003 : /* Run the expression through planner */
1004 70131 : defexpr = expression_planner(defexpr);
1005 :
1006 : /* Initialize executable expression in copycontext */
1007 70131 : defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
1008 70131 : defmap[num_defaults] = attnum;
1009 70131 : num_defaults++;
1010 : }
1011 : }
1012 :
1013 110276 : for (i = 0; i < num_defaults; i++)
1014 70131 : slot->tts_values[defmap[i]] =
1015 70131 : ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
1016 : }
1017 :
1018 : /*
1019 : * Store tuple data into slot.
1020 : *
1021 : * Incoming data can be either text or binary format.
1022 : */
1023 : static void
1024 171779 : slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
1025 : LogicalRepTupleData *tupleData)
1026 : {
1027 171779 : int natts = slot->tts_tupleDescriptor->natts;
1028 : int i;
1029 :
1030 171779 : ExecClearTuple(slot);
1031 :
1032 : /* Call the "in" function for each non-dropped, non-null attribute */
1033 : Assert(natts == rel->attrmap->maplen);
1034 705259 : for (i = 0; i < natts; i++)
1035 : {
1036 533480 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
1037 533480 : int remoteattnum = rel->attrmap->attnums[i];
1038 :
1039 533480 : if (!att->attisdropped && remoteattnum >= 0)
1040 326563 : {
1041 : StringInfo colvalue;
1042 :
1043 326563 : if (remoteattnum >= tupleData->ncols)
1044 0 : ereport(ERROR,
1045 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1046 : errmsg("logical replication column %d not found in tuple: only %d column(s) received",
1047 : remoteattnum + 1, tupleData->ncols)));
1048 :
1049 326563 : colvalue = &tupleData->colvalues[remoteattnum];
1050 :
1051 : /* Set attnum for error callback */
1052 326563 : apply_error_callback_arg.remote_attnum = remoteattnum;
1053 :
1054 326563 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
1055 : {
1056 : Oid typinput;
1057 : Oid typioparam;
1058 :
1059 166237 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1060 332474 : slot->tts_values[i] =
1061 166237 : OidInputFunctionCall(typinput, colvalue->data,
1062 : typioparam, att->atttypmod);
1063 166237 : slot->tts_isnull[i] = false;
1064 : }
1065 160326 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1066 : {
1067 : Oid typreceive;
1068 : Oid typioparam;
1069 :
1070 : /*
1071 : * In some code paths we may be asked to re-parse the same
1072 : * tuple data. Reset the StringInfo's cursor so that works.
1073 : */
1074 109980 : colvalue->cursor = 0;
1075 :
1076 109980 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1077 219960 : slot->tts_values[i] =
1078 109980 : OidReceiveFunctionCall(typreceive, colvalue,
1079 : typioparam, att->atttypmod);
1080 :
1081 : /* Trouble if it didn't eat the whole buffer */
1082 109980 : if (colvalue->cursor != colvalue->len)
1083 0 : ereport(ERROR,
1084 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
1085 : errmsg("incorrect binary data format in logical replication column %d",
1086 : remoteattnum + 1)));
1087 109980 : slot->tts_isnull[i] = false;
1088 : }
1089 : else
1090 : {
1091 : /*
1092 : * NULL value from remote. (We don't expect to see
1093 : * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
1094 : * NULL.)
1095 : */
1096 50346 : slot->tts_values[i] = (Datum) 0;
1097 50346 : slot->tts_isnull[i] = true;
1098 : }
1099 :
1100 : /* Reset attnum for error callback */
1101 326563 : apply_error_callback_arg.remote_attnum = -1;
1102 : }
1103 : else
1104 : {
1105 : /*
1106 : * We assign NULL to dropped attributes and missing values
1107 : * (missing values should be later filled using
1108 : * slot_fill_defaults).
1109 : */
1110 206917 : slot->tts_values[i] = (Datum) 0;
1111 206917 : slot->tts_isnull[i] = true;
1112 : }
1113 : }
1114 :
1115 171779 : ExecStoreVirtualTuple(slot);
1116 171779 : }
1117 :
1118 : /*
1119 : * Replace updated columns with data from the LogicalRepTupleData struct.
1120 : * This is somewhat similar to heap_modify_tuple but also calls the type
1121 : * input functions on the user data.
1122 : *
1123 : * "slot" is filled with a copy of the tuple in "srcslot", replacing
1124 : * columns provided in "tupleData" and leaving others as-is.
1125 : *
1126 : * Caution: unreplaced pass-by-ref columns in "slot" will point into the
1127 : * storage for "srcslot". This is OK for current usage, but someday we may
1128 : * need to materialize "slot" at the end to make it independent of "srcslot".
1129 : */
1130 : static void
1131 31930 : slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
1132 : LogicalRepRelMapEntry *rel,
1133 : LogicalRepTupleData *tupleData)
1134 : {
1135 31930 : int natts = slot->tts_tupleDescriptor->natts;
1136 : int i;
1137 :
1138 : /* We'll fill "slot" with a virtual tuple, so we must start with ... */
1139 31930 : ExecClearTuple(slot);
1140 :
1141 : /*
1142 : * Copy all the column data from srcslot, so that we'll have valid values
1143 : * for unreplaced columns.
1144 : */
1145 : Assert(natts == srcslot->tts_tupleDescriptor->natts);
1146 31930 : slot_getallattrs(srcslot);
1147 31930 : memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
1148 31930 : memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
1149 :
1150 : /* Call the "in" function for each replaced attribute */
1151 : Assert(natts == rel->attrmap->maplen);
1152 159304 : for (i = 0; i < natts; i++)
1153 : {
1154 127374 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
1155 127374 : int remoteattnum = rel->attrmap->attnums[i];
1156 :
1157 127374 : if (remoteattnum < 0)
1158 58519 : continue;
1159 :
1160 68855 : if (remoteattnum >= tupleData->ncols)
1161 0 : ereport(ERROR,
1162 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1163 : errmsg("logical replication column %d not found in tuple: only %d column(s) received",
1164 : remoteattnum + 1, tupleData->ncols)));
1165 :
1166 68855 : if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
1167 : {
1168 68855 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
1169 :
1170 : /* Set attnum for error callback */
1171 68855 : apply_error_callback_arg.remote_attnum = remoteattnum;
1172 :
1173 68855 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
1174 : {
1175 : Oid typinput;
1176 : Oid typioparam;
1177 :
1178 25451 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1179 50902 : slot->tts_values[i] =
1180 25451 : OidInputFunctionCall(typinput, colvalue->data,
1181 : typioparam, att->atttypmod);
1182 25451 : slot->tts_isnull[i] = false;
1183 : }
1184 43404 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1185 : {
1186 : Oid typreceive;
1187 : Oid typioparam;
1188 :
1189 : /*
1190 : * In some code paths we may be asked to re-parse the same
1191 : * tuple data. Reset the StringInfo's cursor so that works.
1192 : */
1193 43356 : colvalue->cursor = 0;
1194 :
1195 43356 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1196 86712 : slot->tts_values[i] =
1197 43356 : OidReceiveFunctionCall(typreceive, colvalue,
1198 : typioparam, att->atttypmod);
1199 :
1200 : /* Trouble if it didn't eat the whole buffer */
1201 43356 : if (colvalue->cursor != colvalue->len)
1202 0 : ereport(ERROR,
1203 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
1204 : errmsg("incorrect binary data format in logical replication column %d",
1205 : remoteattnum + 1)));
1206 43356 : slot->tts_isnull[i] = false;
1207 : }
1208 : else
1209 : {
1210 : /* must be LOGICALREP_COLUMN_NULL */
1211 48 : slot->tts_values[i] = (Datum) 0;
1212 48 : slot->tts_isnull[i] = true;
1213 : }
1214 :
1215 : /* Reset attnum for error callback */
1216 68855 : apply_error_callback_arg.remote_attnum = -1;
1217 : }
1218 : }
1219 :
1220 : /* And finally, declare that "slot" contains a valid virtual tuple */
1221 31930 : ExecStoreVirtualTuple(slot);
1222 31930 : }
1223 :
1224 : /*
1225 : * Handle BEGIN message.
1226 : */
1227 : static void
1228 533 : apply_handle_begin(StringInfo s)
1229 : {
1230 : LogicalRepBeginData begin_data;
1231 :
1232 : /* There must not be an active streaming transaction. */
1233 : Assert(!TransactionIdIsValid(stream_xid));
1234 :
1235 533 : logicalrep_read_begin(s, &begin_data);
1236 533 : set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
1237 :
1238 533 : remote_final_lsn = begin_data.final_lsn;
1239 :
1240 533 : maybe_start_skipping_changes(begin_data.final_lsn);
1241 :
1242 533 : in_remote_transaction = true;
1243 :
1244 533 : pgstat_report_activity(STATE_RUNNING, NULL);
1245 533 : }
1246 :
1247 : /*
1248 : * Handle COMMIT message.
1249 : *
1250 : * TODO, support tracking of multiple origins
1251 : */
1252 : static void
1253 467 : apply_handle_commit(StringInfo s)
1254 : {
1255 : LogicalRepCommitData commit_data;
1256 :
1257 467 : logicalrep_read_commit(s, &commit_data);
1258 :
1259 467 : if (commit_data.commit_lsn != remote_final_lsn)
1260 0 : ereport(ERROR,
1261 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1262 : errmsg_internal("incorrect commit LSN %X/%08X in commit message (expected %X/%08X)",
1263 : LSN_FORMAT_ARGS(commit_data.commit_lsn),
1264 : LSN_FORMAT_ARGS(remote_final_lsn))));
1265 :
1266 467 : apply_handle_commit_internal(&commit_data);
1267 :
1268 : /*
1269 : * Process any tables that are being synchronized in parallel, as well as
1270 : * any newly added tables or sequences.
1271 : */
1272 467 : ProcessSyncingRelations(commit_data.end_lsn);
1273 :
1274 466 : pgstat_report_activity(STATE_IDLE, NULL);
1275 466 : reset_apply_error_context_info();
1276 466 : }
1277 :
1278 : /*
1279 : * Handle BEGIN PREPARE message.
1280 : */
1281 : static void
1282 18 : apply_handle_begin_prepare(StringInfo s)
1283 : {
1284 : LogicalRepPreparedTxnData begin_data;
1285 :
1286 : /* Tablesync should never receive prepare. */
1287 18 : if (am_tablesync_worker())
1288 0 : ereport(ERROR,
1289 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1290 : errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1291 :
1292 : /* There must not be an active streaming transaction. */
1293 : Assert(!TransactionIdIsValid(stream_xid));
1294 :
1295 18 : logicalrep_read_begin_prepare(s, &begin_data);
1296 18 : set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
1297 :
1298 18 : remote_final_lsn = begin_data.prepare_lsn;
1299 :
1300 18 : maybe_start_skipping_changes(begin_data.prepare_lsn);
1301 :
1302 18 : in_remote_transaction = true;
1303 :
1304 18 : pgstat_report_activity(STATE_RUNNING, NULL);
1305 18 : }
1306 :
1307 : /*
1308 : * Common function to prepare the GID.
1309 : */
1310 : static void
1311 27 : apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
1312 : {
1313 : char gid[GIDSIZE];
1314 :
1315 : /*
1316 : * Compute unique GID for two_phase transactions. We don't use GID of
1317 : * prepared transaction sent by server as that can lead to deadlock when
1318 : * we have multiple subscriptions from same node point to publications on
1319 : * the same node. See comments atop worker.c
1320 : */
1321 27 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
1322 : gid, sizeof(gid));
1323 :
1324 : /*
1325 : * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1326 : * called within the PrepareTransactionBlock below.
1327 : */
1328 27 : if (!IsTransactionBlock())
1329 : {
1330 27 : BeginTransactionBlock();
1331 27 : CommitTransactionCommand(); /* Completes the preceding Begin command. */
1332 : }
1333 :
1334 : /*
1335 : * Update origin state so we can restart streaming from correct position
1336 : * in case of crash.
1337 : */
1338 27 : replorigin_xact_state.origin_lsn = prepare_data->end_lsn;
1339 27 : replorigin_xact_state.origin_timestamp = prepare_data->prepare_time;
1340 :
1341 27 : PrepareTransactionBlock(gid);
1342 27 : }
1343 :
1344 : /*
1345 : * Handle PREPARE message.
1346 : */
1347 : static void
1348 17 : apply_handle_prepare(StringInfo s)
1349 : {
1350 : LogicalRepPreparedTxnData prepare_data;
1351 :
1352 17 : logicalrep_read_prepare(s, &prepare_data);
1353 :
1354 17 : if (prepare_data.prepare_lsn != remote_final_lsn)
1355 0 : ereport(ERROR,
1356 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1357 : errmsg_internal("incorrect prepare LSN %X/%08X in prepare message (expected %X/%08X)",
1358 : LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1359 : LSN_FORMAT_ARGS(remote_final_lsn))));
1360 :
1361 : /*
1362 : * Unlike commit, here, we always prepare the transaction even though no
1363 : * change has happened in this transaction or all changes are skipped. It
1364 : * is done this way because at commit prepared time, we won't know whether
1365 : * we have skipped preparing a transaction because of those reasons.
1366 : *
1367 : * XXX, We can optimize such that at commit prepared time, we first check
1368 : * whether we have prepared the transaction or not but that doesn't seem
1369 : * worthwhile because such cases shouldn't be common.
1370 : */
1371 17 : begin_replication_step();
1372 :
1373 17 : apply_handle_prepare_internal(&prepare_data);
1374 :
1375 17 : end_replication_step();
1376 17 : CommitTransactionCommand();
1377 15 : pgstat_report_stat(false);
1378 :
1379 : /*
1380 : * It is okay not to set the local_end LSN for the prepare because we
1381 : * always flush the prepare record. So, we can send the acknowledgment of
1382 : * the remote_end LSN as soon as prepare is finished.
1383 : *
1384 : * XXX For the sake of consistency with commit, we could have set it with
1385 : * the LSN of prepare but as of now we don't track that value similar to
1386 : * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
1387 : * it.
1388 : */
1389 15 : store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
1390 :
1391 15 : in_remote_transaction = false;
1392 :
1393 : /*
1394 : * Process any tables that are being synchronized in parallel, as well as
1395 : * any newly added tables or sequences.
1396 : */
1397 15 : ProcessSyncingRelations(prepare_data.end_lsn);
1398 :
1399 : /*
1400 : * Since we have already prepared the transaction, in a case where the
1401 : * server crashes before clearing the subskiplsn, it will be left but the
1402 : * transaction won't be resent. But that's okay because it's a rare case
1403 : * and the subskiplsn will be cleared when finishing the next transaction.
1404 : */
1405 15 : stop_skipping_changes();
1406 15 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
1407 :
1408 15 : pgstat_report_activity(STATE_IDLE, NULL);
1409 15 : reset_apply_error_context_info();
1410 15 : }
1411 :
1412 : /*
1413 : * Handle a COMMIT PREPARED of a previously PREPARED transaction.
1414 : *
1415 : * Note that we don't need to wait here if the transaction was prepared in a
1416 : * parallel apply worker. In that case, we have already waited for the prepare
1417 : * to finish in apply_handle_stream_prepare() which will ensure all the
1418 : * operations in that transaction have happened in the subscriber, so no
1419 : * concurrent transaction can cause deadlock or transaction dependency issues.
1420 : */
1421 : static void
1422 22 : apply_handle_commit_prepared(StringInfo s)
1423 : {
1424 : LogicalRepCommitPreparedTxnData prepare_data;
1425 : char gid[GIDSIZE];
1426 :
1427 22 : logicalrep_read_commit_prepared(s, &prepare_data);
1428 22 : set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
1429 :
1430 : /* Compute GID for two_phase transactions. */
1431 22 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
1432 : gid, sizeof(gid));
1433 :
1434 : /* There is no transaction when COMMIT PREPARED is called */
1435 22 : begin_replication_step();
1436 :
1437 : /*
1438 : * Update origin state so we can restart streaming from correct position
1439 : * in case of crash.
1440 : */
1441 22 : replorigin_xact_state.origin_lsn = prepare_data.end_lsn;
1442 22 : replorigin_xact_state.origin_timestamp = prepare_data.commit_time;
1443 :
1444 22 : FinishPreparedTransaction(gid, true);
1445 22 : end_replication_step();
1446 22 : CommitTransactionCommand();
1447 22 : pgstat_report_stat(false);
1448 :
1449 22 : store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
1450 22 : in_remote_transaction = false;
1451 :
1452 : /*
1453 : * Process any tables that are being synchronized in parallel, as well as
1454 : * any newly added tables or sequences.
1455 : */
1456 22 : ProcessSyncingRelations(prepare_data.end_lsn);
1457 :
1458 22 : clear_subscription_skip_lsn(prepare_data.end_lsn);
1459 :
1460 22 : pgstat_report_activity(STATE_IDLE, NULL);
1461 22 : reset_apply_error_context_info();
1462 22 : }
1463 :
1464 : /*
1465 : * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
1466 : *
1467 : * Note that we don't need to wait here if the transaction was prepared in a
1468 : * parallel apply worker. In that case, we have already waited for the prepare
1469 : * to finish in apply_handle_stream_prepare() which will ensure all the
1470 : * operations in that transaction have happened in the subscriber, so no
1471 : * concurrent transaction can cause deadlock or transaction dependency issues.
1472 : */
1473 : static void
1474 5 : apply_handle_rollback_prepared(StringInfo s)
1475 : {
1476 : LogicalRepRollbackPreparedTxnData rollback_data;
1477 : char gid[GIDSIZE];
1478 :
1479 5 : logicalrep_read_rollback_prepared(s, &rollback_data);
1480 5 : set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1481 :
1482 : /* Compute GID for two_phase transactions. */
1483 5 : TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
1484 : gid, sizeof(gid));
1485 :
1486 : /*
1487 : * It is possible that we haven't received prepare because it occurred
1488 : * before walsender reached a consistent point or the two_phase was still
1489 : * not enabled by that time, so in such cases, we need to skip rollback
1490 : * prepared.
1491 : */
1492 5 : if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1493 : rollback_data.prepare_time))
1494 : {
1495 : /*
1496 : * Update origin state so we can restart streaming from correct
1497 : * position in case of crash.
1498 : */
1499 5 : replorigin_xact_state.origin_lsn = rollback_data.rollback_end_lsn;
1500 5 : replorigin_xact_state.origin_timestamp = rollback_data.rollback_time;
1501 :
1502 : /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1503 5 : begin_replication_step();
1504 5 : FinishPreparedTransaction(gid, false);
1505 5 : end_replication_step();
1506 5 : CommitTransactionCommand();
1507 :
1508 5 : clear_subscription_skip_lsn(rollback_data.rollback_end_lsn);
1509 : }
1510 :
1511 5 : pgstat_report_stat(false);
1512 :
1513 : /*
1514 : * It is okay not to set the local_end LSN for the rollback of prepared
1515 : * transaction because we always flush the WAL record for it. See
1516 : * apply_handle_prepare.
1517 : */
1518 5 : store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr);
1519 5 : in_remote_transaction = false;
1520 :
1521 : /*
1522 : * Process any tables that are being synchronized in parallel, as well as
1523 : * any newly added tables or sequences.
1524 : */
1525 5 : ProcessSyncingRelations(rollback_data.rollback_end_lsn);
1526 :
1527 5 : pgstat_report_activity(STATE_IDLE, NULL);
1528 5 : reset_apply_error_context_info();
1529 5 : }
1530 :
1531 : /*
1532 : * Handle STREAM PREPARE.
1533 : */
1534 : static void
1535 15 : apply_handle_stream_prepare(StringInfo s)
1536 : {
1537 : LogicalRepPreparedTxnData prepare_data;
1538 : ParallelApplyWorkerInfo *winfo;
1539 : TransApplyAction apply_action;
1540 :
1541 : /* Save the message before it is consumed. */
1542 15 : StringInfoData original_msg = *s;
1543 :
1544 15 : if (in_streamed_transaction)
1545 0 : ereport(ERROR,
1546 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1547 : errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1548 :
1549 : /* Tablesync should never receive prepare. */
1550 15 : if (am_tablesync_worker())
1551 0 : ereport(ERROR,
1552 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1553 : errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1554 :
1555 15 : logicalrep_read_stream_prepare(s, &prepare_data);
1556 15 : set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1557 :
1558 15 : apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
1559 :
1560 15 : switch (apply_action)
1561 : {
1562 5 : case TRANS_LEADER_APPLY:
1563 :
1564 : /*
1565 : * The transaction has been serialized to file, so replay all the
1566 : * spooled operations.
1567 : */
1568 5 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset,
1569 : prepare_data.xid, prepare_data.prepare_lsn);
1570 :
1571 : /* Mark the transaction as prepared. */
1572 5 : apply_handle_prepare_internal(&prepare_data);
1573 :
1574 5 : CommitTransactionCommand();
1575 :
1576 : /*
1577 : * It is okay not to set the local_end LSN for the prepare because
1578 : * we always flush the prepare record. See apply_handle_prepare.
1579 : */
1580 5 : store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
1581 :
1582 5 : in_remote_transaction = false;
1583 :
1584 : /* Unlink the files with serialized changes and subxact info. */
1585 5 : stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
1586 :
1587 5 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1588 5 : break;
1589 :
1590 4 : case TRANS_LEADER_SEND_TO_PARALLEL:
1591 : Assert(winfo);
1592 :
1593 4 : if (pa_send_data(winfo, s->len, s->data))
1594 : {
1595 : /* Finish processing the streaming transaction. */
1596 4 : pa_xact_finish(winfo, prepare_data.end_lsn);
1597 3 : break;
1598 : }
1599 :
1600 : /*
1601 : * Switch to serialize mode when we are not able to send the
1602 : * change to parallel apply worker.
1603 : */
1604 0 : pa_switch_to_partial_serialize(winfo, true);
1605 :
1606 : pg_fallthrough;
1607 1 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1608 : Assert(winfo);
1609 :
1610 1 : stream_open_and_write_change(prepare_data.xid,
1611 : LOGICAL_REP_MSG_STREAM_PREPARE,
1612 : &original_msg);
1613 :
1614 1 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
1615 :
1616 : /* Finish processing the streaming transaction. */
1617 1 : pa_xact_finish(winfo, prepare_data.end_lsn);
1618 1 : break;
1619 :
1620 5 : case TRANS_PARALLEL_APPLY:
1621 :
1622 : /*
1623 : * If the parallel apply worker is applying spooled messages then
1624 : * close the file before preparing.
1625 : */
1626 5 : if (stream_fd)
1627 1 : stream_close_file();
1628 :
1629 5 : begin_replication_step();
1630 :
1631 : /* Mark the transaction as prepared. */
1632 5 : apply_handle_prepare_internal(&prepare_data);
1633 :
1634 5 : end_replication_step();
1635 :
1636 5 : CommitTransactionCommand();
1637 :
1638 : /*
1639 : * It is okay not to set the local_end LSN for the prepare because
1640 : * we always flush the prepare record. See apply_handle_prepare.
1641 : */
1642 4 : MyParallelShared->last_commit_end = InvalidXLogRecPtr;
1643 :
1644 4 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
1645 4 : pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1646 :
1647 4 : pa_reset_subtrans();
1648 :
1649 4 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1650 4 : break;
1651 :
1652 0 : default:
1653 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1654 : break;
1655 : }
1656 :
1657 13 : pgstat_report_stat(false);
1658 :
1659 : /*
1660 : * Process any tables that are being synchronized in parallel, as well as
1661 : * any newly added tables or sequences.
1662 : */
1663 13 : ProcessSyncingRelations(prepare_data.end_lsn);
1664 :
1665 : /*
1666 : * Similar to prepare case, the subskiplsn could be left in a case of
1667 : * server crash but it's okay. See the comments in apply_handle_prepare().
1668 : */
1669 13 : stop_skipping_changes();
1670 13 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
1671 :
1672 13 : pgstat_report_activity(STATE_IDLE, NULL);
1673 :
1674 13 : reset_apply_error_context_info();
1675 13 : }
1676 :
1677 : /*
1678 : * Handle ORIGIN message.
1679 : *
1680 : * TODO, support tracking of multiple origins
1681 : */
1682 : static void
1683 7 : apply_handle_origin(StringInfo s)
1684 : {
1685 : /*
1686 : * ORIGIN message can only come inside streaming transaction or inside
1687 : * remote transaction and before any actual writes.
1688 : */
1689 7 : if (!in_streamed_transaction &&
1690 10 : (!in_remote_transaction ||
1691 5 : (IsTransactionState() && !am_tablesync_worker())))
1692 0 : ereport(ERROR,
1693 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1694 : errmsg_internal("ORIGIN message sent out of order")));
1695 7 : }
1696 :
1697 : /*
1698 : * Initialize fileset (if not already done).
1699 : *
1700 : * Create a new file when first_segment is true, otherwise open the existing
1701 : * file.
1702 : */
1703 : void
1704 362 : stream_start_internal(TransactionId xid, bool first_segment)
1705 : {
1706 362 : begin_replication_step();
1707 :
1708 : /*
1709 : * Initialize the worker's stream_fileset if we haven't yet. This will be
1710 : * used for the entire duration of the worker so create it in a permanent
1711 : * context. We create this on the very first streaming message from any
1712 : * transaction and then use it for this and other streaming transactions.
1713 : * Now, we could create a fileset at the start of the worker as well but
1714 : * then we won't be sure that it will ever be used.
1715 : */
1716 362 : if (!MyLogicalRepWorker->stream_fileset)
1717 : {
1718 : MemoryContext oldctx;
1719 :
1720 14 : oldctx = MemoryContextSwitchTo(ApplyContext);
1721 :
1722 14 : MyLogicalRepWorker->stream_fileset = palloc_object(FileSet);
1723 14 : FileSetInit(MyLogicalRepWorker->stream_fileset);
1724 :
1725 14 : MemoryContextSwitchTo(oldctx);
1726 : }
1727 :
1728 : /* Open the spool file for this transaction. */
1729 362 : stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
1730 :
1731 : /* If this is not the first segment, open existing subxact file. */
1732 362 : if (!first_segment)
1733 330 : subxact_info_read(MyLogicalRepWorker->subid, xid);
1734 :
1735 362 : end_replication_step();
1736 362 : }
1737 :
1738 : /*
1739 : * Handle STREAM START message.
1740 : */
1741 : static void
1742 842 : apply_handle_stream_start(StringInfo s)
1743 : {
1744 : bool first_segment;
1745 : ParallelApplyWorkerInfo *winfo;
1746 : TransApplyAction apply_action;
1747 :
1748 : /* Save the message before it is consumed. */
1749 842 : StringInfoData original_msg = *s;
1750 :
1751 842 : if (in_streamed_transaction)
1752 0 : ereport(ERROR,
1753 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1754 : errmsg_internal("duplicate STREAM START message")));
1755 :
1756 : /* There must not be an active streaming transaction. */
1757 : Assert(!TransactionIdIsValid(stream_xid));
1758 :
1759 : /* notify handle methods we're processing a remote transaction */
1760 842 : in_streamed_transaction = true;
1761 :
1762 : /* extract XID of the top-level transaction */
1763 842 : stream_xid = logicalrep_read_stream_start(s, &first_segment);
1764 :
1765 842 : if (!TransactionIdIsValid(stream_xid))
1766 0 : ereport(ERROR,
1767 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1768 : errmsg_internal("invalid transaction ID in streamed replication transaction")));
1769 :
1770 842 : set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
1771 :
1772 : /* Try to allocate a worker for the streaming transaction. */
1773 842 : if (first_segment)
1774 86 : pa_allocate_worker(stream_xid);
1775 :
1776 842 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1777 :
1778 842 : switch (apply_action)
1779 : {
1780 342 : case TRANS_LEADER_SERIALIZE:
1781 :
1782 : /*
1783 : * Function stream_start_internal starts a transaction. This
1784 : * transaction will be committed on the stream stop unless it is a
1785 : * tablesync worker in which case it will be committed after
1786 : * processing all the messages. We need this transaction for
1787 : * handling the BufFile, used for serializing the streaming data
1788 : * and subxact info.
1789 : */
1790 342 : stream_start_internal(stream_xid, first_segment);
1791 342 : break;
1792 :
1793 244 : case TRANS_LEADER_SEND_TO_PARALLEL:
1794 : Assert(winfo);
1795 :
1796 : /*
1797 : * Once we start serializing the changes, the parallel apply
1798 : * worker will wait for the leader to release the stream lock
1799 : * until the end of the transaction. So, we don't need to release
1800 : * the lock or increment the stream count in that case.
1801 : */
1802 244 : if (pa_send_data(winfo, s->len, s->data))
1803 : {
1804 : /*
1805 : * Unlock the shared object lock so that the parallel apply
1806 : * worker can continue to receive changes.
1807 : */
1808 240 : if (!first_segment)
1809 215 : pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
1810 :
1811 : /*
1812 : * Increment the number of streaming blocks waiting to be
1813 : * processed by parallel apply worker.
1814 : */
1815 240 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
1816 :
1817 : /* Cache the parallel apply worker for this transaction. */
1818 240 : pa_set_stream_apply_worker(winfo);
1819 240 : break;
1820 : }
1821 :
1822 : /*
1823 : * Switch to serialize mode when we are not able to send the
1824 : * change to parallel apply worker.
1825 : */
1826 4 : pa_switch_to_partial_serialize(winfo, !first_segment);
1827 :
1828 : pg_fallthrough;
1829 15 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1830 : Assert(winfo);
1831 :
1832 : /*
1833 : * Open the spool file unless it was already opened when switching
1834 : * to serialize mode. The transaction started in
1835 : * stream_start_internal will be committed on the stream stop.
1836 : */
1837 15 : if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
1838 11 : stream_start_internal(stream_xid, first_segment);
1839 :
1840 15 : stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg);
1841 :
1842 : /* Cache the parallel apply worker for this transaction. */
1843 15 : pa_set_stream_apply_worker(winfo);
1844 15 : break;
1845 :
1846 245 : case TRANS_PARALLEL_APPLY:
1847 245 : if (first_segment)
1848 : {
1849 : /* Hold the lock until the end of the transaction. */
1850 29 : pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1851 29 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED);
1852 :
1853 : /*
1854 : * Signal the leader apply worker, as it may be waiting for
1855 : * us.
1856 : */
1857 29 : logicalrep_worker_wakeup(WORKERTYPE_APPLY,
1858 29 : MyLogicalRepWorker->subid, InvalidOid);
1859 : }
1860 :
1861 245 : parallel_stream_nchanges = 0;
1862 245 : break;
1863 :
1864 0 : default:
1865 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1866 : break;
1867 : }
1868 :
1869 842 : pgstat_report_activity(STATE_RUNNING, NULL);
1870 842 : }
1871 :
1872 : /*
1873 : * Update the information about subxacts and close the file.
1874 : *
1875 : * This function should be called when the stream_start_internal function has
1876 : * been called.
1877 : */
1878 : void
1879 362 : stream_stop_internal(TransactionId xid)
1880 : {
1881 : /*
1882 : * Serialize information about subxacts for the toplevel transaction, then
1883 : * close the stream messages spool file.
1884 : */
1885 362 : subxact_info_write(MyLogicalRepWorker->subid, xid);
1886 362 : stream_close_file();
1887 :
1888 : /* We must be in a valid transaction state */
1889 : Assert(IsTransactionState());
1890 :
1891 : /* Commit the per-stream transaction */
1892 362 : CommitTransactionCommand();
1893 :
1894 : /* Reset per-stream context */
1895 362 : MemoryContextReset(LogicalStreamingContext);
1896 362 : }
1897 :
1898 : /*
1899 : * Handle STREAM STOP message.
1900 : */
1901 : static void
1902 841 : apply_handle_stream_stop(StringInfo s)
1903 : {
1904 : ParallelApplyWorkerInfo *winfo;
1905 : TransApplyAction apply_action;
1906 :
1907 841 : if (!in_streamed_transaction)
1908 0 : ereport(ERROR,
1909 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1910 : errmsg_internal("STREAM STOP message without STREAM START")));
1911 :
1912 841 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1913 :
1914 841 : switch (apply_action)
1915 : {
1916 342 : case TRANS_LEADER_SERIALIZE:
1917 342 : stream_stop_internal(stream_xid);
1918 342 : break;
1919 :
1920 240 : case TRANS_LEADER_SEND_TO_PARALLEL:
1921 : Assert(winfo);
1922 :
1923 : /*
1924 : * Lock before sending the STREAM_STOP message so that the leader
1925 : * can hold the lock first and the parallel apply worker will wait
1926 : * for leader to release the lock. See Locking Considerations atop
1927 : * applyparallelworker.c.
1928 : */
1929 240 : pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
1930 :
1931 240 : if (pa_send_data(winfo, s->len, s->data))
1932 : {
1933 240 : pa_set_stream_apply_worker(NULL);
1934 240 : break;
1935 : }
1936 :
1937 : /*
1938 : * Switch to serialize mode when we are not able to send the
1939 : * change to parallel apply worker.
1940 : */
1941 0 : pa_switch_to_partial_serialize(winfo, true);
1942 :
1943 : pg_fallthrough;
1944 15 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1945 15 : stream_write_change(LOGICAL_REP_MSG_STREAM_STOP, s);
1946 15 : stream_stop_internal(stream_xid);
1947 15 : pa_set_stream_apply_worker(NULL);
1948 15 : break;
1949 :
1950 244 : case TRANS_PARALLEL_APPLY:
1951 244 : elog(DEBUG1, "applied %u changes in the streaming chunk",
1952 : parallel_stream_nchanges);
1953 :
1954 : /*
1955 : * By the time parallel apply worker is processing the changes in
1956 : * the current streaming block, the leader apply worker may have
1957 : * sent multiple streaming blocks. This can lead to parallel apply
1958 : * worker start waiting even when there are more chunk of streams
1959 : * in the queue. So, try to lock only if there is no message left
1960 : * in the queue. See Locking Considerations atop
1961 : * applyparallelworker.c.
1962 : *
1963 : * Note that here we have a race condition where we can start
1964 : * waiting even when there are pending streaming chunks. This can
1965 : * happen if the leader sends another streaming block and acquires
1966 : * the stream lock again after the parallel apply worker checks
1967 : * that there is no pending streaming block and before it actually
1968 : * starts waiting on a lock. We can handle this case by not
1969 : * allowing the leader to increment the stream block count during
1970 : * the time parallel apply worker acquires the lock but it is not
1971 : * clear whether that is worth the complexity.
1972 : *
1973 : * Now, if this missed chunk contains rollback to savepoint, then
1974 : * there is a risk of deadlock which probably shouldn't happen
1975 : * after restart.
1976 : */
1977 244 : pa_decr_and_wait_stream_block();
1978 242 : break;
1979 :
1980 0 : default:
1981 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1982 : break;
1983 : }
1984 :
1985 839 : in_streamed_transaction = false;
1986 839 : stream_xid = InvalidTransactionId;
1987 :
1988 : /*
1989 : * The parallel apply worker could be in a transaction in which case we
1990 : * need to report the state as STATE_IDLEINTRANSACTION.
1991 : */
1992 839 : if (IsTransactionOrTransactionBlock())
1993 242 : pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL);
1994 : else
1995 597 : pgstat_report_activity(STATE_IDLE, NULL);
1996 :
1997 839 : reset_apply_error_context_info();
1998 839 : }
1999 :
2000 : /*
2001 : * Helper function to handle STREAM ABORT message when the transaction was
2002 : * serialized to file.
2003 : */
2004 : static void
2005 14 : stream_abort_internal(TransactionId xid, TransactionId subxid)
2006 : {
2007 : /*
2008 : * If the two XIDs are the same, it's in fact abort of toplevel xact, so
2009 : * just delete the files with serialized info.
2010 : */
2011 14 : if (xid == subxid)
2012 1 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
2013 : else
2014 : {
2015 : /*
2016 : * OK, so it's a subxact. We need to read the subxact file for the
2017 : * toplevel transaction, determine the offset tracked for the subxact,
2018 : * and truncate the file with changes. We also remove the subxacts
2019 : * with higher offsets (or rather higher XIDs).
2020 : *
2021 : * We intentionally scan the array from the tail, because we're likely
2022 : * aborting a change for the most recent subtransactions.
2023 : *
2024 : * We can't use the binary search here as subxact XIDs won't
2025 : * necessarily arrive in sorted order, consider the case where we have
2026 : * released the savepoint for multiple subtransactions and then
2027 : * performed rollback to savepoint for one of the earlier
2028 : * sub-transaction.
2029 : */
2030 : int64 i;
2031 : int64 subidx;
2032 : BufFile *fd;
2033 13 : bool found = false;
2034 : char path[MAXPGPATH];
2035 :
2036 13 : subidx = -1;
2037 13 : begin_replication_step();
2038 13 : subxact_info_read(MyLogicalRepWorker->subid, xid);
2039 :
2040 15 : for (i = subxact_data.nsubxacts; i > 0; i--)
2041 : {
2042 11 : if (subxact_data.subxacts[i - 1].xid == subxid)
2043 : {
2044 9 : subidx = (i - 1);
2045 9 : found = true;
2046 9 : break;
2047 : }
2048 : }
2049 :
2050 : /*
2051 : * If it's an empty sub-transaction then we will not find the subxid
2052 : * here so just cleanup the subxact info and return.
2053 : */
2054 13 : if (!found)
2055 : {
2056 : /* Cleanup the subxact info */
2057 4 : cleanup_subxact_info();
2058 4 : end_replication_step();
2059 4 : CommitTransactionCommand();
2060 4 : return;
2061 : }
2062 :
2063 : /* open the changes file */
2064 9 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2065 9 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path,
2066 : O_RDWR, false);
2067 :
2068 : /* OK, truncate the file at the right offset */
2069 9 : BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
2070 9 : subxact_data.subxacts[subidx].offset);
2071 9 : BufFileClose(fd);
2072 :
2073 : /* discard the subxacts added later */
2074 9 : subxact_data.nsubxacts = subidx;
2075 :
2076 : /* write the updated subxact list */
2077 9 : subxact_info_write(MyLogicalRepWorker->subid, xid);
2078 :
2079 9 : end_replication_step();
2080 9 : CommitTransactionCommand();
2081 : }
2082 : }
2083 :
2084 : /*
2085 : * Handle STREAM ABORT message.
2086 : */
2087 : static void
2088 38 : apply_handle_stream_abort(StringInfo s)
2089 : {
2090 : TransactionId xid;
2091 : TransactionId subxid;
2092 : LogicalRepStreamAbortData abort_data;
2093 : ParallelApplyWorkerInfo *winfo;
2094 : TransApplyAction apply_action;
2095 :
2096 : /* Save the message before it is consumed. */
2097 38 : StringInfoData original_msg = *s;
2098 : bool toplevel_xact;
2099 :
2100 38 : if (in_streamed_transaction)
2101 0 : ereport(ERROR,
2102 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2103 : errmsg_internal("STREAM ABORT message without STREAM STOP")));
2104 :
2105 : /* We receive abort information only when we can apply in parallel. */
2106 38 : logicalrep_read_stream_abort(s, &abort_data,
2107 38 : MyLogicalRepWorker->parallel_apply);
2108 :
2109 38 : xid = abort_data.xid;
2110 38 : subxid = abort_data.subxid;
2111 38 : toplevel_xact = (xid == subxid);
2112 :
2113 38 : set_apply_error_context_xact(subxid, abort_data.abort_lsn);
2114 :
2115 38 : apply_action = get_transaction_apply_action(xid, &winfo);
2116 :
2117 38 : switch (apply_action)
2118 : {
2119 14 : case TRANS_LEADER_APPLY:
2120 :
2121 : /*
2122 : * We are in the leader apply worker and the transaction has been
2123 : * serialized to file.
2124 : */
2125 14 : stream_abort_internal(xid, subxid);
2126 :
2127 14 : elog(DEBUG1, "finished processing the STREAM ABORT command");
2128 14 : break;
2129 :
2130 10 : case TRANS_LEADER_SEND_TO_PARALLEL:
2131 : Assert(winfo);
2132 :
2133 : /*
2134 : * For the case of aborting the subtransaction, we increment the
2135 : * number of streaming blocks and take the lock again before
2136 : * sending the STREAM_ABORT to ensure that the parallel apply
2137 : * worker will wait on the lock for the next set of changes after
2138 : * processing the STREAM_ABORT message if it is not already
2139 : * waiting for STREAM_STOP message.
2140 : *
2141 : * It is important to perform this locking before sending the
2142 : * STREAM_ABORT message so that the leader can hold the lock first
2143 : * and the parallel apply worker will wait for the leader to
2144 : * release the lock. This is the same as what we do in
2145 : * apply_handle_stream_stop. See Locking Considerations atop
2146 : * applyparallelworker.c.
2147 : */
2148 10 : if (!toplevel_xact)
2149 : {
2150 9 : pa_unlock_stream(xid, AccessExclusiveLock);
2151 9 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
2152 9 : pa_lock_stream(xid, AccessExclusiveLock);
2153 : }
2154 :
2155 10 : if (pa_send_data(winfo, s->len, s->data))
2156 : {
2157 : /*
2158 : * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
2159 : * wait here for the parallel apply worker to finish as that
2160 : * is not required to maintain the commit order and won't have
2161 : * the risk of failures due to transaction dependencies and
2162 : * deadlocks. However, it is possible that before the parallel
2163 : * worker finishes and we clear the worker info, the xid
2164 : * wraparound happens on the upstream and a new transaction
2165 : * with the same xid can appear and that can lead to duplicate
2166 : * entries in ParallelApplyTxnHash. Yet another problem could
2167 : * be that we may have serialized the changes in partial
2168 : * serialize mode and the file containing xact changes may
2169 : * already exist, and after xid wraparound trying to create
2170 : * the file for the same xid can lead to an error. To avoid
2171 : * these problems, we decide to wait for the aborts to finish.
2172 : *
2173 : * Note, it is okay to not update the flush location position
2174 : * for aborts as in worst case that means such a transaction
2175 : * won't be sent again after restart.
2176 : */
2177 10 : if (toplevel_xact)
2178 1 : pa_xact_finish(winfo, InvalidXLogRecPtr);
2179 :
2180 10 : break;
2181 : }
2182 :
2183 : /*
2184 : * Switch to serialize mode when we are not able to send the
2185 : * change to parallel apply worker.
2186 : */
2187 0 : pa_switch_to_partial_serialize(winfo, true);
2188 :
2189 : pg_fallthrough;
2190 2 : case TRANS_LEADER_PARTIAL_SERIALIZE:
2191 : Assert(winfo);
2192 :
2193 : /*
2194 : * Parallel apply worker might have applied some changes, so write
2195 : * the STREAM_ABORT message so that it can rollback the
2196 : * subtransaction if needed.
2197 : */
2198 2 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_ABORT,
2199 : &original_msg);
2200 :
2201 2 : if (toplevel_xact)
2202 : {
2203 1 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
2204 1 : pa_xact_finish(winfo, InvalidXLogRecPtr);
2205 : }
2206 2 : break;
2207 :
2208 12 : case TRANS_PARALLEL_APPLY:
2209 :
2210 : /*
2211 : * If the parallel apply worker is applying spooled messages then
2212 : * close the file before aborting.
2213 : */
2214 12 : if (toplevel_xact && stream_fd)
2215 1 : stream_close_file();
2216 :
2217 12 : pa_stream_abort(&abort_data);
2218 :
2219 : /*
2220 : * We need to wait after processing rollback to savepoint for the
2221 : * next set of changes.
2222 : *
2223 : * We have a race condition here due to which we can start waiting
2224 : * here when there are more chunk of streams in the queue. See
2225 : * apply_handle_stream_stop.
2226 : */
2227 12 : if (!toplevel_xact)
2228 10 : pa_decr_and_wait_stream_block();
2229 :
2230 12 : elog(DEBUG1, "finished processing the STREAM ABORT command");
2231 12 : break;
2232 :
2233 0 : default:
2234 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2235 : break;
2236 : }
2237 :
2238 38 : reset_apply_error_context_info();
2239 38 : }
2240 :
2241 : /*
2242 : * Ensure that the passed location is fileset's end.
2243 : */
2244 : static void
2245 4 : ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
2246 : pgoff_t offset)
2247 : {
2248 : char path[MAXPGPATH];
2249 : BufFile *fd;
2250 : int last_fileno;
2251 : pgoff_t last_offset;
2252 :
2253 : Assert(!IsTransactionState());
2254 :
2255 4 : begin_replication_step();
2256 :
2257 4 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2258 :
2259 4 : fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2260 :
2261 4 : BufFileSeek(fd, 0, 0, SEEK_END);
2262 4 : BufFileTell(fd, &last_fileno, &last_offset);
2263 :
2264 4 : BufFileClose(fd);
2265 :
2266 4 : end_replication_step();
2267 :
2268 4 : if (last_fileno != fileno || last_offset != offset)
2269 0 : elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2270 : path);
2271 4 : }
2272 :
2273 : /*
2274 : * Common spoolfile processing.
2275 : */
2276 : void
2277 31 : apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
2278 : XLogRecPtr lsn)
2279 : {
2280 : int nchanges;
2281 : char path[MAXPGPATH];
2282 31 : char *buffer = NULL;
2283 : MemoryContext oldcxt;
2284 : ResourceOwner oldowner;
2285 : int fileno;
2286 : pgoff_t offset;
2287 :
2288 31 : if (!am_parallel_apply_worker())
2289 27 : maybe_start_skipping_changes(lsn);
2290 :
2291 : /* Make sure we have an open transaction */
2292 31 : begin_replication_step();
2293 :
2294 : /*
2295 : * Allocate file handle and memory required to process all the messages in
2296 : * TopTransactionContext to avoid them getting reset after each message is
2297 : * processed.
2298 : */
2299 31 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
2300 :
2301 : /* Open the spool file for the committed/prepared transaction */
2302 31 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2303 31 : elog(DEBUG1, "replaying changes from file \"%s\"", path);
2304 :
2305 : /*
2306 : * Make sure the file is owned by the toplevel transaction so that the
2307 : * file will not be accidentally closed when aborting a subtransaction.
2308 : */
2309 31 : oldowner = CurrentResourceOwner;
2310 31 : CurrentResourceOwner = TopTransactionResourceOwner;
2311 :
2312 31 : stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2313 :
2314 31 : CurrentResourceOwner = oldowner;
2315 :
2316 31 : buffer = palloc(BLCKSZ);
2317 :
2318 31 : MemoryContextSwitchTo(oldcxt);
2319 :
2320 31 : remote_final_lsn = lsn;
2321 :
2322 : /*
2323 : * Make sure the handle apply_dispatch methods are aware we're in a remote
2324 : * transaction.
2325 : */
2326 31 : in_remote_transaction = true;
2327 31 : pgstat_report_activity(STATE_RUNNING, NULL);
2328 :
2329 31 : end_replication_step();
2330 :
2331 : /*
2332 : * Read the entries one by one and pass them through the same logic as in
2333 : * apply_dispatch.
2334 : */
2335 31 : nchanges = 0;
2336 : while (true)
2337 88470 : {
2338 : StringInfoData s2;
2339 : size_t nbytes;
2340 : int len;
2341 :
2342 88501 : CHECK_FOR_INTERRUPTS();
2343 :
2344 : /* read length of the on-disk record */
2345 88501 : nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2346 :
2347 : /* have we reached end of the file? */
2348 88501 : if (nbytes == 0)
2349 26 : break;
2350 :
2351 : /* do we have a correct length? */
2352 88475 : if (len <= 0)
2353 0 : elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2354 : len, path);
2355 :
2356 : /* make sure we have sufficiently large buffer */
2357 88475 : buffer = repalloc(buffer, len);
2358 :
2359 : /* and finally read the data into the buffer */
2360 88475 : BufFileReadExact(stream_fd, buffer, len);
2361 :
2362 88475 : BufFileTell(stream_fd, &fileno, &offset);
2363 :
2364 : /* init a stringinfo using the buffer and call apply_dispatch */
2365 88475 : initReadOnlyStringInfo(&s2, buffer, len);
2366 :
2367 : /* Ensure we are reading the data into our memory context. */
2368 88475 : oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
2369 :
2370 88475 : apply_dispatch(&s2);
2371 :
2372 88474 : MemoryContextReset(ApplyMessageContext);
2373 :
2374 88474 : MemoryContextSwitchTo(oldcxt);
2375 :
2376 88474 : nchanges++;
2377 :
2378 : /*
2379 : * It is possible the file has been closed because we have processed
2380 : * the transaction end message like stream_commit in which case that
2381 : * must be the last message.
2382 : */
2383 88474 : if (!stream_fd)
2384 : {
2385 4 : ensure_last_message(stream_fileset, xid, fileno, offset);
2386 4 : break;
2387 : }
2388 :
2389 88470 : if (nchanges % 1000 == 0)
2390 84 : elog(DEBUG1, "replayed %d changes from file \"%s\"",
2391 : nchanges, path);
2392 : }
2393 :
2394 30 : if (stream_fd)
2395 26 : stream_close_file();
2396 :
2397 30 : elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2398 : nchanges, path);
2399 :
2400 30 : return;
2401 : }
2402 :
2403 : /*
2404 : * Handle STREAM COMMIT message.
2405 : */
2406 : static void
2407 61 : apply_handle_stream_commit(StringInfo s)
2408 : {
2409 : TransactionId xid;
2410 : LogicalRepCommitData commit_data;
2411 : ParallelApplyWorkerInfo *winfo;
2412 : TransApplyAction apply_action;
2413 :
2414 : /* Save the message before it is consumed. */
2415 61 : StringInfoData original_msg = *s;
2416 :
2417 61 : if (in_streamed_transaction)
2418 0 : ereport(ERROR,
2419 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2420 : errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2421 :
2422 61 : xid = logicalrep_read_stream_commit(s, &commit_data);
2423 61 : set_apply_error_context_xact(xid, commit_data.commit_lsn);
2424 :
2425 61 : apply_action = get_transaction_apply_action(xid, &winfo);
2426 :
2427 61 : switch (apply_action)
2428 : {
2429 22 : case TRANS_LEADER_APPLY:
2430 :
2431 : /*
2432 : * The transaction has been serialized to file, so replay all the
2433 : * spooled operations.
2434 : */
2435 22 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid,
2436 : commit_data.commit_lsn);
2437 :
2438 21 : apply_handle_commit_internal(&commit_data);
2439 :
2440 : /* Unlink the files with serialized changes and subxact info. */
2441 21 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
2442 :
2443 21 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2444 21 : break;
2445 :
2446 18 : case TRANS_LEADER_SEND_TO_PARALLEL:
2447 : Assert(winfo);
2448 :
2449 18 : if (pa_send_data(winfo, s->len, s->data))
2450 : {
2451 : /* Finish processing the streaming transaction. */
2452 18 : pa_xact_finish(winfo, commit_data.end_lsn);
2453 17 : break;
2454 : }
2455 :
2456 : /*
2457 : * Switch to serialize mode when we are not able to send the
2458 : * change to parallel apply worker.
2459 : */
2460 0 : pa_switch_to_partial_serialize(winfo, true);
2461 :
2462 : pg_fallthrough;
2463 2 : case TRANS_LEADER_PARTIAL_SERIALIZE:
2464 : Assert(winfo);
2465 :
2466 2 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_COMMIT,
2467 : &original_msg);
2468 :
2469 2 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
2470 :
2471 : /* Finish processing the streaming transaction. */
2472 2 : pa_xact_finish(winfo, commit_data.end_lsn);
2473 2 : break;
2474 :
2475 19 : case TRANS_PARALLEL_APPLY:
2476 :
2477 : /*
2478 : * If the parallel apply worker is applying spooled messages then
2479 : * close the file before committing.
2480 : */
2481 19 : if (stream_fd)
2482 2 : stream_close_file();
2483 :
2484 19 : apply_handle_commit_internal(&commit_data);
2485 :
2486 19 : MyParallelShared->last_commit_end = XactLastCommitEnd;
2487 :
2488 : /*
2489 : * It is important to set the transaction state as finished before
2490 : * releasing the lock. See pa_wait_for_xact_finish.
2491 : */
2492 19 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
2493 19 : pa_unlock_transaction(xid, AccessExclusiveLock);
2494 :
2495 19 : pa_reset_subtrans();
2496 :
2497 19 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2498 19 : break;
2499 :
2500 0 : default:
2501 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2502 : break;
2503 : }
2504 :
2505 : /*
2506 : * Process any tables that are being synchronized in parallel, as well as
2507 : * any newly added tables or sequences.
2508 : */
2509 59 : ProcessSyncingRelations(commit_data.end_lsn);
2510 :
2511 59 : pgstat_report_activity(STATE_IDLE, NULL);
2512 :
2513 59 : reset_apply_error_context_info();
2514 59 : }
2515 :
2516 : /*
2517 : * Helper function for apply_handle_commit and apply_handle_stream_commit.
2518 : */
2519 : static void
2520 507 : apply_handle_commit_internal(LogicalRepCommitData *commit_data)
2521 : {
2522 507 : if (is_skipping_changes())
2523 : {
2524 2 : stop_skipping_changes();
2525 :
2526 : /*
2527 : * Start a new transaction to clear the subskiplsn, if not started
2528 : * yet.
2529 : */
2530 2 : if (!IsTransactionState())
2531 1 : StartTransactionCommand();
2532 : }
2533 :
2534 507 : if (IsTransactionState())
2535 : {
2536 : /*
2537 : * The transaction is either non-empty or skipped, so we clear the
2538 : * subskiplsn.
2539 : */
2540 507 : clear_subscription_skip_lsn(commit_data->commit_lsn);
2541 :
2542 : /*
2543 : * Update origin state so we can restart streaming from correct
2544 : * position in case of crash.
2545 : */
2546 507 : replorigin_xact_state.origin_lsn = commit_data->end_lsn;
2547 507 : replorigin_xact_state.origin_timestamp = commit_data->committime;
2548 :
2549 507 : CommitTransactionCommand();
2550 :
2551 507 : if (IsTransactionBlock())
2552 : {
2553 4 : EndTransactionBlock(false);
2554 4 : CommitTransactionCommand();
2555 : }
2556 :
2557 507 : pgstat_report_stat(false);
2558 :
2559 507 : store_flush_position(commit_data->end_lsn, XactLastCommitEnd);
2560 : }
2561 : else
2562 : {
2563 : /* Process any invalidation messages that might have accumulated. */
2564 0 : AcceptInvalidationMessages();
2565 0 : maybe_reread_subscription();
2566 : }
2567 :
2568 507 : in_remote_transaction = false;
2569 507 : }
2570 :
2571 : /*
2572 : * Handle RELATION message.
2573 : *
2574 : * Note we don't do validation against local schema here. The validation
2575 : * against local schema is postponed until first change for given relation
2576 : * comes as we only care about it when applying changes for it anyway and we
2577 : * do less locking this way.
2578 : */
2579 : static void
2580 511 : apply_handle_relation(StringInfo s)
2581 : {
2582 : LogicalRepRelation *rel;
2583 :
2584 511 : if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
2585 36 : return;
2586 :
2587 475 : rel = logicalrep_read_rel(s);
2588 475 : logicalrep_relmap_update(rel);
2589 :
2590 : /* Also reset all entries in the partition map that refer to remoterel. */
2591 475 : logicalrep_partmap_reset_relmap(rel);
2592 : }
2593 :
2594 : /*
2595 : * Handle TYPE message.
2596 : *
2597 : * This implementation pays no attention to TYPE messages; we expect the user
2598 : * to have set things up so that the incoming data is acceptable to the input
2599 : * functions for the locally subscribed tables. Hence, we just read and
2600 : * discard the message.
2601 : */
2602 : static void
2603 18 : apply_handle_type(StringInfo s)
2604 : {
2605 : LogicalRepTyp typ;
2606 :
2607 18 : if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s))
2608 0 : return;
2609 :
2610 18 : logicalrep_read_typ(s, &typ);
2611 : }
2612 :
2613 : /*
2614 : * Check that we (the subscription owner) have sufficient privileges on the
2615 : * target relation to perform the given operation.
2616 : */
2617 : static void
2618 244045 : TargetPrivilegesCheck(Relation rel, AclMode mode)
2619 : {
2620 : Oid relid;
2621 : AclResult aclresult;
2622 :
2623 244045 : relid = RelationGetRelid(rel);
2624 244045 : aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
2625 244045 : if (aclresult != ACLCHECK_OK)
2626 10 : aclcheck_error(aclresult,
2627 10 : get_relkind_objtype(rel->rd_rel->relkind),
2628 10 : get_rel_name(relid));
2629 :
2630 : /*
2631 : * We lack the infrastructure to honor RLS policies. It might be possible
2632 : * to add such infrastructure here, but tablesync workers lack it, too, so
2633 : * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2634 : * but it seems dangerous to replicate a TRUNCATE and then refuse to
2635 : * replicate subsequent INSERTs, so we forbid all commands the same.
2636 : */
2637 244035 : if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2638 4 : ereport(ERROR,
2639 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2640 : errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2641 : GetUserNameFromId(GetUserId(), true),
2642 : RelationGetRelationName(rel))));
2643 244031 : }
2644 :
2645 : /*
2646 : * Handle INSERT message.
2647 : */
2648 :
2649 : static void
2650 209554 : apply_handle_insert(StringInfo s)
2651 : {
2652 : LogicalRepRelMapEntry *rel;
2653 : LogicalRepTupleData newtup;
2654 : LogicalRepRelId relid;
2655 : UserContext ucxt;
2656 : ApplyExecutionData *edata;
2657 : EState *estate;
2658 : TupleTableSlot *remoteslot;
2659 : MemoryContext oldctx;
2660 : bool run_as_owner;
2661 :
2662 : /*
2663 : * Quick return if we are skipping data modification changes or handling
2664 : * streamed transactions.
2665 : */
2666 409107 : if (is_skipping_changes() ||
2667 199553 : handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
2668 110059 : return;
2669 :
2670 99545 : begin_replication_step();
2671 :
2672 99544 : relid = logicalrep_read_insert(s, &newtup);
2673 99544 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2674 99530 : if (!should_apply_changes_for_rel(rel))
2675 : {
2676 : /*
2677 : * The relation can't become interesting in the middle of the
2678 : * transaction so it's safe to unlock it.
2679 : */
2680 50 : logicalrep_rel_close(rel, RowExclusiveLock);
2681 50 : end_replication_step();
2682 50 : return;
2683 : }
2684 :
2685 : /*
2686 : * Make sure that any user-supplied code runs as the table owner, unless
2687 : * the user has opted out of that behavior.
2688 : */
2689 99480 : run_as_owner = MySubscription->runasowner;
2690 99480 : if (!run_as_owner)
2691 99472 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2692 :
2693 : /* Set relation for error callback */
2694 99480 : apply_error_callback_arg.rel = rel;
2695 :
2696 : /* Initialize the executor state. */
2697 99480 : edata = create_edata_for_relation(rel);
2698 99480 : estate = edata->estate;
2699 99480 : remoteslot = ExecInitExtraTupleSlot(estate,
2700 99480 : RelationGetDescr(rel->localrel),
2701 : &TTSOpsVirtual);
2702 :
2703 : /* Process and store remote tuple in the slot */
2704 99480 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2705 99480 : slot_store_data(remoteslot, rel, &newtup);
2706 99480 : slot_fill_defaults(rel, estate, remoteslot);
2707 99480 : MemoryContextSwitchTo(oldctx);
2708 :
2709 : /* For a partitioned table, insert the tuple into a partition. */
2710 99480 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2711 69 : apply_handle_tuple_routing(edata,
2712 : remoteslot, NULL, CMD_INSERT);
2713 : else
2714 : {
2715 99411 : ResultRelInfo *relinfo = edata->targetRelInfo;
2716 :
2717 99411 : ExecOpenIndices(relinfo, false);
2718 99411 : apply_handle_insert_internal(edata, relinfo, remoteslot);
2719 99394 : ExecCloseIndices(relinfo);
2720 : }
2721 :
2722 99438 : finish_edata(edata);
2723 :
2724 : /* Reset relation for error callback */
2725 99438 : apply_error_callback_arg.rel = NULL;
2726 :
2727 99438 : if (!run_as_owner)
2728 99433 : RestoreUserContext(&ucxt);
2729 :
2730 99438 : logicalrep_rel_close(rel, NoLock);
2731 :
2732 99438 : end_replication_step();
2733 : }
2734 :
2735 : /*
2736 : * Workhorse for apply_handle_insert()
2737 : * relinfo is for the relation we're actually inserting into
2738 : * (could be a child partition of edata->targetRelInfo)
2739 : */
2740 : static void
2741 99481 : apply_handle_insert_internal(ApplyExecutionData *edata,
2742 : ResultRelInfo *relinfo,
2743 : TupleTableSlot *remoteslot)
2744 : {
2745 99481 : EState *estate = edata->estate;
2746 :
2747 : /* Caller should have opened indexes already. */
2748 : Assert(relinfo->ri_IndexRelationDescs != NULL ||
2749 : !relinfo->ri_RelationDesc->rd_rel->relhasindex ||
2750 : RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
2751 :
2752 : /* Caller will not have done this bit. */
2753 : Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
2754 99481 : InitConflictIndexes(relinfo);
2755 :
2756 : /* Do the insert. */
2757 99481 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
2758 99474 : ExecSimpleRelationInsert(relinfo, estate, remoteslot);
2759 99439 : }
2760 :
2761 : /*
2762 : * Check if the logical replication relation is updatable and throw
2763 : * appropriate error if it isn't.
2764 : */
2765 : static void
2766 72311 : check_relation_updatable(LogicalRepRelMapEntry *rel)
2767 : {
2768 : /*
2769 : * For partitioned tables, we only need to care if the target partition is
2770 : * updatable (aka has PK or RI defined for it).
2771 : */
2772 72311 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2773 30 : return;
2774 :
2775 : /* Updatable, no error. */
2776 72281 : if (rel->updatable)
2777 72281 : return;
2778 :
2779 : /*
2780 : * We are in error mode so it's fine this is somewhat slow. It's better to
2781 : * give user correct error.
2782 : */
2783 0 : if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
2784 : {
2785 0 : ereport(ERROR,
2786 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2787 : errmsg("publisher did not send replica identity column "
2788 : "expected by the logical replication target relation \"%s.%s\"",
2789 : rel->remoterel.nspname, rel->remoterel.relname)));
2790 : }
2791 :
2792 0 : ereport(ERROR,
2793 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2794 : errmsg("logical replication target relation \"%s.%s\" has "
2795 : "neither REPLICA IDENTITY index nor PRIMARY "
2796 : "KEY and published relation does not have "
2797 : "REPLICA IDENTITY FULL",
2798 : rel->remoterel.nspname, rel->remoterel.relname)));
2799 : }
2800 :
2801 : /*
2802 : * Handle UPDATE message.
2803 : *
2804 : * TODO: FDW support
2805 : */
2806 : static void
2807 66178 : apply_handle_update(StringInfo s)
2808 : {
2809 : LogicalRepRelMapEntry *rel;
2810 : LogicalRepRelId relid;
2811 : UserContext ucxt;
2812 : ApplyExecutionData *edata;
2813 : EState *estate;
2814 : LogicalRepTupleData oldtup;
2815 : LogicalRepTupleData newtup;
2816 : bool has_oldtup;
2817 : TupleTableSlot *remoteslot;
2818 : RTEPermissionInfo *target_perminfo;
2819 : MemoryContext oldctx;
2820 : bool run_as_owner;
2821 :
2822 : /*
2823 : * Quick return if we are skipping data modification changes or handling
2824 : * streamed transactions.
2825 : */
2826 132353 : if (is_skipping_changes() ||
2827 66175 : handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
2828 34223 : return;
2829 :
2830 31955 : begin_replication_step();
2831 :
2832 31954 : relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
2833 : &newtup);
2834 31954 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2835 31954 : if (!should_apply_changes_for_rel(rel))
2836 : {
2837 : /*
2838 : * The relation can't become interesting in the middle of the
2839 : * transaction so it's safe to unlock it.
2840 : */
2841 0 : logicalrep_rel_close(rel, RowExclusiveLock);
2842 0 : end_replication_step();
2843 0 : return;
2844 : }
2845 :
2846 : /* Set relation for error callback */
2847 31954 : apply_error_callback_arg.rel = rel;
2848 :
2849 : /* Check if we can do the update. */
2850 31954 : check_relation_updatable(rel);
2851 :
2852 : /*
2853 : * Make sure that any user-supplied code runs as the table owner, unless
2854 : * the user has opted out of that behavior.
2855 : */
2856 31954 : run_as_owner = MySubscription->runasowner;
2857 31954 : if (!run_as_owner)
2858 31950 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2859 :
2860 : /* Initialize the executor state. */
2861 31953 : edata = create_edata_for_relation(rel);
2862 31953 : estate = edata->estate;
2863 31953 : remoteslot = ExecInitExtraTupleSlot(estate,
2864 31953 : RelationGetDescr(rel->localrel),
2865 : &TTSOpsVirtual);
2866 :
2867 : /*
2868 : * Populate updatedCols so that per-column triggers can fire, and so
2869 : * executor can correctly pass down indexUnchanged hint. This could
2870 : * include more columns than were actually changed on the publisher
2871 : * because the logical replication protocol doesn't contain that
2872 : * information. But it would for example exclude columns that only exist
2873 : * on the subscriber, since we are not touching those.
2874 : */
2875 31953 : target_perminfo = list_nth(estate->es_rteperminfos, 0);
2876 159371 : for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2877 : {
2878 127418 : CompactAttribute *att = TupleDescCompactAttr(remoteslot->tts_tupleDescriptor, i);
2879 127418 : int remoteattnum = rel->attrmap->attnums[i];
2880 :
2881 127418 : if (!att->attisdropped && remoteattnum >= 0)
2882 : {
2883 68901 : if (remoteattnum >= newtup.ncols)
2884 0 : ereport(ERROR,
2885 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2886 : errmsg("logical replication column %d not found in tuple: only %d column(s) received",
2887 : remoteattnum + 1, newtup.ncols)));
2888 :
2889 68901 : if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
2890 68901 : target_perminfo->updatedCols =
2891 68901 : bms_add_member(target_perminfo->updatedCols,
2892 : i + 1 - FirstLowInvalidHeapAttributeNumber);
2893 : }
2894 : }
2895 :
2896 : /* Build the search tuple. */
2897 31953 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2898 31953 : slot_store_data(remoteslot, rel,
2899 31953 : has_oldtup ? &oldtup : &newtup);
2900 31953 : MemoryContextSwitchTo(oldctx);
2901 :
2902 : /* For a partitioned table, apply update to correct partition. */
2903 31953 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2904 13 : apply_handle_tuple_routing(edata,
2905 : remoteslot, &newtup, CMD_UPDATE);
2906 : else
2907 31940 : apply_handle_update_internal(edata, edata->targetRelInfo,
2908 : remoteslot, &newtup, rel->localindexoid);
2909 :
2910 31944 : finish_edata(edata);
2911 :
2912 : /* Reset relation for error callback */
2913 31944 : apply_error_callback_arg.rel = NULL;
2914 :
2915 31944 : if (!run_as_owner)
2916 31942 : RestoreUserContext(&ucxt);
2917 :
2918 31944 : logicalrep_rel_close(rel, NoLock);
2919 :
2920 31944 : end_replication_step();
2921 : }
2922 :
2923 : /*
2924 : * Workhorse for apply_handle_update()
2925 : * relinfo is for the relation we're actually updating in
2926 : * (could be a child partition of edata->targetRelInfo)
2927 : */
2928 : static void
2929 31940 : apply_handle_update_internal(ApplyExecutionData *edata,
2930 : ResultRelInfo *relinfo,
2931 : TupleTableSlot *remoteslot,
2932 : LogicalRepTupleData *newtup,
2933 : Oid localindexoid)
2934 : {
2935 31940 : EState *estate = edata->estate;
2936 31940 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2937 31940 : Relation localrel = relinfo->ri_RelationDesc;
2938 : EPQState epqstate;
2939 31940 : TupleTableSlot *localslot = NULL;
2940 31940 : ConflictTupleInfo conflicttuple = {0};
2941 : bool found;
2942 : MemoryContext oldctx;
2943 :
2944 31940 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2945 31940 : ExecOpenIndices(relinfo, false);
2946 :
2947 31940 : found = FindReplTupleInLocalRel(edata, localrel,
2948 : &relmapentry->remoterel,
2949 : localindexoid,
2950 : remoteslot, &localslot);
2951 :
2952 : /*
2953 : * Tuple found.
2954 : *
2955 : * Note this will fail if there are other conflicting unique indexes.
2956 : */
2957 31933 : if (found)
2958 : {
2959 : /*
2960 : * Report the conflict if the tuple was modified by a different
2961 : * origin.
2962 : */
2963 31919 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
2964 2 : &conflicttuple.origin, &conflicttuple.ts) &&
2965 2 : conflicttuple.origin != replorigin_xact_state.origin)
2966 : {
2967 : TupleTableSlot *newslot;
2968 :
2969 : /* Store the new tuple for conflict reporting */
2970 2 : newslot = table_slot_create(localrel, &estate->es_tupleTable);
2971 2 : slot_store_data(newslot, relmapentry, newtup);
2972 :
2973 2 : conflicttuple.slot = localslot;
2974 :
2975 2 : ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
2976 : remoteslot, newslot,
2977 : list_make1(&conflicttuple));
2978 : }
2979 :
2980 : /* Process and store remote tuple in the slot */
2981 31919 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2982 31919 : slot_modify_data(remoteslot, localslot, relmapentry, newtup);
2983 31919 : MemoryContextSwitchTo(oldctx);
2984 :
2985 31919 : EvalPlanQualSetSlot(&epqstate, remoteslot);
2986 :
2987 31919 : InitConflictIndexes(relinfo);
2988 :
2989 : /* Do the actual update. */
2990 31919 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
2991 31919 : ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2992 : remoteslot);
2993 : }
2994 : else
2995 : {
2996 : ConflictType type;
2997 14 : TupleTableSlot *newslot = localslot;
2998 :
2999 : /*
3000 : * Detecting whether the tuple was recently deleted or never existed
3001 : * is crucial to avoid misleading the user during conflict handling.
3002 : */
3003 14 : if (FindDeletedTupleInLocalRel(localrel, localindexoid, remoteslot,
3004 : &conflicttuple.xmin,
3005 : &conflicttuple.origin,
3006 3 : &conflicttuple.ts) &&
3007 3 : conflicttuple.origin != replorigin_xact_state.origin)
3008 3 : type = CT_UPDATE_DELETED;
3009 : else
3010 11 : type = CT_UPDATE_MISSING;
3011 :
3012 : /* Store the new tuple for conflict reporting */
3013 14 : slot_store_data(newslot, relmapentry, newtup);
3014 :
3015 : /*
3016 : * The tuple to be updated could not be found or was deleted. Do
3017 : * nothing except for emitting a log message.
3018 : */
3019 14 : ReportApplyConflict(estate, relinfo, LOG, type, remoteslot, newslot,
3020 : list_make1(&conflicttuple));
3021 : }
3022 :
3023 : /* Cleanup. */
3024 31931 : ExecCloseIndices(relinfo);
3025 31931 : EvalPlanQualEnd(&epqstate);
3026 31931 : }
3027 :
3028 : /*
3029 : * Handle DELETE message.
3030 : *
3031 : * TODO: FDW support
3032 : */
3033 : static void
3034 81942 : apply_handle_delete(StringInfo s)
3035 : {
3036 : LogicalRepRelMapEntry *rel;
3037 : LogicalRepTupleData oldtup;
3038 : LogicalRepRelId relid;
3039 : UserContext ucxt;
3040 : ApplyExecutionData *edata;
3041 : EState *estate;
3042 : TupleTableSlot *remoteslot;
3043 : MemoryContext oldctx;
3044 : bool run_as_owner;
3045 :
3046 : /*
3047 : * Quick return if we are skipping data modification changes or handling
3048 : * streamed transactions.
3049 : */
3050 163884 : if (is_skipping_changes() ||
3051 81942 : handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
3052 41615 : return;
3053 :
3054 40327 : begin_replication_step();
3055 :
3056 40327 : relid = logicalrep_read_delete(s, &oldtup);
3057 40327 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
3058 40327 : if (!should_apply_changes_for_rel(rel))
3059 : {
3060 : /*
3061 : * The relation can't become interesting in the middle of the
3062 : * transaction so it's safe to unlock it.
3063 : */
3064 0 : logicalrep_rel_close(rel, RowExclusiveLock);
3065 0 : end_replication_step();
3066 0 : return;
3067 : }
3068 :
3069 : /* Set relation for error callback */
3070 40327 : apply_error_callback_arg.rel = rel;
3071 :
3072 : /* Check if we can do the delete. */
3073 40327 : check_relation_updatable(rel);
3074 :
3075 : /*
3076 : * Make sure that any user-supplied code runs as the table owner, unless
3077 : * the user has opted out of that behavior.
3078 : */
3079 40327 : run_as_owner = MySubscription->runasowner;
3080 40327 : if (!run_as_owner)
3081 40325 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
3082 :
3083 : /* Initialize the executor state. */
3084 40327 : edata = create_edata_for_relation(rel);
3085 40327 : estate = edata->estate;
3086 40327 : remoteslot = ExecInitExtraTupleSlot(estate,
3087 40327 : RelationGetDescr(rel->localrel),
3088 : &TTSOpsVirtual);
3089 :
3090 : /* Build the search tuple. */
3091 40327 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3092 40327 : slot_store_data(remoteslot, rel, &oldtup);
3093 40327 : MemoryContextSwitchTo(oldctx);
3094 :
3095 : /* For a partitioned table, apply delete to correct partition. */
3096 40327 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3097 17 : apply_handle_tuple_routing(edata,
3098 : remoteslot, NULL, CMD_DELETE);
3099 : else
3100 : {
3101 40310 : ResultRelInfo *relinfo = edata->targetRelInfo;
3102 :
3103 40310 : ExecOpenIndices(relinfo, false);
3104 40310 : apply_handle_delete_internal(edata, relinfo,
3105 : remoteslot, rel->localindexoid);
3106 40310 : ExecCloseIndices(relinfo);
3107 : }
3108 :
3109 40327 : finish_edata(edata);
3110 :
3111 : /* Reset relation for error callback */
3112 40327 : apply_error_callback_arg.rel = NULL;
3113 :
3114 40327 : if (!run_as_owner)
3115 40325 : RestoreUserContext(&ucxt);
3116 :
3117 40327 : logicalrep_rel_close(rel, NoLock);
3118 :
3119 40327 : end_replication_step();
3120 : }
3121 :
3122 : /*
3123 : * Workhorse for apply_handle_delete()
3124 : * relinfo is for the relation we're actually deleting from
3125 : * (could be a child partition of edata->targetRelInfo)
3126 : */
3127 : static void
3128 40327 : apply_handle_delete_internal(ApplyExecutionData *edata,
3129 : ResultRelInfo *relinfo,
3130 : TupleTableSlot *remoteslot,
3131 : Oid localindexoid)
3132 : {
3133 40327 : EState *estate = edata->estate;
3134 40327 : Relation localrel = relinfo->ri_RelationDesc;
3135 40327 : LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
3136 : EPQState epqstate;
3137 : TupleTableSlot *localslot;
3138 40327 : ConflictTupleInfo conflicttuple = {0};
3139 : bool found;
3140 :
3141 40327 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3142 :
3143 : /* Caller should have opened indexes already. */
3144 : Assert(relinfo->ri_IndexRelationDescs != NULL ||
3145 : !localrel->rd_rel->relhasindex ||
3146 : RelationGetIndexList(localrel) == NIL);
3147 :
3148 40327 : found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
3149 : remoteslot, &localslot);
3150 :
3151 : /* If found delete it. */
3152 40327 : if (found)
3153 : {
3154 : /*
3155 : * Report the conflict if the tuple was modified by a different
3156 : * origin.
3157 : */
3158 40318 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
3159 6 : &conflicttuple.origin, &conflicttuple.ts) &&
3160 6 : conflicttuple.origin != replorigin_xact_state.origin)
3161 : {
3162 5 : conflicttuple.slot = localslot;
3163 5 : ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
3164 : remoteslot, NULL,
3165 : list_make1(&conflicttuple));
3166 : }
3167 :
3168 40318 : EvalPlanQualSetSlot(&epqstate, localslot);
3169 :
3170 : /* Do the actual delete. */
3171 40318 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_DELETE);
3172 40318 : ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
3173 : }
3174 : else
3175 : {
3176 : /*
3177 : * The tuple to be deleted could not be found. Do nothing except for
3178 : * emitting a log message.
3179 : */
3180 9 : ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
3181 : remoteslot, NULL, list_make1(&conflicttuple));
3182 : }
3183 :
3184 : /* Cleanup. */
3185 40327 : EvalPlanQualEnd(&epqstate);
3186 40327 : }
3187 :
3188 : /*
3189 : * Try to find a tuple received from the publication side (in 'remoteslot') in
3190 : * the corresponding local relation using either replica identity index,
3191 : * primary key, index or if needed, sequential scan.
3192 : *
3193 : * Local tuple, if found, is returned in '*localslot'.
3194 : */
3195 : static bool
3196 72280 : FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
3197 : LogicalRepRelation *remoterel,
3198 : Oid localidxoid,
3199 : TupleTableSlot *remoteslot,
3200 : TupleTableSlot **localslot)
3201 : {
3202 72280 : EState *estate = edata->estate;
3203 : bool found;
3204 :
3205 : /*
3206 : * Regardless of the top-level operation, we're performing a read here, so
3207 : * check for SELECT privileges.
3208 : */
3209 72280 : TargetPrivilegesCheck(localrel, ACL_SELECT);
3210 :
3211 72273 : *localslot = table_slot_create(localrel, &estate->es_tupleTable);
3212 :
3213 : Assert(OidIsValid(localidxoid) ||
3214 : (remoterel->replident == REPLICA_IDENTITY_FULL));
3215 :
3216 72273 : if (OidIsValid(localidxoid))
3217 : {
3218 : #ifdef USE_ASSERT_CHECKING
3219 : Relation idxrel = index_open(localidxoid, AccessShareLock);
3220 :
3221 : /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
3222 : Assert(GetRelationIdentityOrPK(localrel) == localidxoid ||
3223 : (remoterel->replident == REPLICA_IDENTITY_FULL &&
3224 : IsIndexUsableForReplicaIdentityFull(idxrel,
3225 : edata->targetRel->attrmap)));
3226 : index_close(idxrel, AccessShareLock);
3227 : #endif
3228 :
3229 72120 : found = RelationFindReplTupleByIndex(localrel, localidxoid,
3230 : LockTupleExclusive,
3231 : remoteslot, *localslot);
3232 : }
3233 : else
3234 153 : found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
3235 : remoteslot, *localslot);
3236 :
3237 72273 : return found;
3238 : }
3239 :
3240 : /*
3241 : * Determine whether the index can reliably locate the deleted tuple in the
3242 : * local relation.
3243 : *
3244 : * An index may exclude deleted tuples if it was re-indexed or re-created during
3245 : * change application. Therefore, an index is considered usable only if the
3246 : * conflict detection slot.xmin (conflict_detection_xmin) is greater than the
3247 : * index tuple's xmin. This ensures that any tuples deleted prior to the index
3248 : * creation or re-indexing are not relevant for conflict detection in the
3249 : * current apply worker.
3250 : *
3251 : * Note that indexes may also be excluded if they were modified by other DDL
3252 : * operations, such as ALTER INDEX. However, this is acceptable, as the
3253 : * likelihood of such DDL changes coinciding with the need to scan dead
3254 : * tuples for the update_deleted is low.
3255 : */
3256 : static bool
3257 1 : IsIndexUsableForFindingDeletedTuple(Oid localindexoid,
3258 : TransactionId conflict_detection_xmin)
3259 : {
3260 : HeapTuple index_tuple;
3261 : TransactionId index_xmin;
3262 :
3263 1 : index_tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(localindexoid));
3264 :
3265 1 : if (!HeapTupleIsValid(index_tuple)) /* should not happen */
3266 0 : elog(ERROR, "cache lookup failed for index %u", localindexoid);
3267 :
3268 : /*
3269 : * No need to check for a frozen transaction ID, as
3270 : * TransactionIdPrecedes() manages it internally, treating it as falling
3271 : * behind the conflict_detection_xmin.
3272 : */
3273 1 : index_xmin = HeapTupleHeaderGetXmin(index_tuple->t_data);
3274 :
3275 1 : ReleaseSysCache(index_tuple);
3276 :
3277 1 : return TransactionIdPrecedes(index_xmin, conflict_detection_xmin);
3278 : }
3279 :
3280 : /*
3281 : * Attempts to locate a deleted tuple in the local relation that matches the
3282 : * values of the tuple received from the publication side (in 'remoteslot').
3283 : * The search is performed using either the replica identity index, primary
3284 : * key, other available index, or a sequential scan if necessary.
3285 : *
3286 : * Returns true if the deleted tuple is found. If found, the transaction ID,
3287 : * origin, and commit timestamp of the deletion are stored in '*delete_xid',
3288 : * '*delete_origin', and '*delete_time' respectively.
3289 : */
3290 : static bool
3291 16 : FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid,
3292 : TupleTableSlot *remoteslot,
3293 : TransactionId *delete_xid, ReplOriginId *delete_origin,
3294 : TimestampTz *delete_time)
3295 : {
3296 : TransactionId oldestxmin;
3297 :
3298 : /*
3299 : * Return false if either dead tuples are not retained or commit timestamp
3300 : * data is not available.
3301 : */
3302 16 : if (!MySubscription->retaindeadtuples || !track_commit_timestamp)
3303 13 : return false;
3304 :
3305 : /*
3306 : * For conflict detection, we use the leader worker's
3307 : * oldest_nonremovable_xid value instead of invoking
3308 : * GetOldestNonRemovableTransactionId() or using the conflict detection
3309 : * slot's xmin. The oldest_nonremovable_xid acts as a threshold to
3310 : * identify tuples that were recently deleted. These deleted tuples are no
3311 : * longer visible to concurrent transactions. However, if a remote update
3312 : * matches such a tuple, we log an update_deleted conflict.
3313 : *
3314 : * While GetOldestNonRemovableTransactionId() and slot.xmin may return
3315 : * transaction IDs older than oldest_nonremovable_xid, for our current
3316 : * purpose, it is acceptable to treat tuples deleted by transactions prior
3317 : * to oldest_nonremovable_xid as update_missing conflicts.
3318 : */
3319 3 : if (am_leader_apply_worker())
3320 : {
3321 3 : oldestxmin = MyLogicalRepWorker->oldest_nonremovable_xid;
3322 : }
3323 : else
3324 : {
3325 : LogicalRepWorker *leader;
3326 :
3327 : /*
3328 : * Obtain the information from the leader apply worker as only the
3329 : * leader manages oldest_nonremovable_xid (see
3330 : * maybe_advance_nonremovable_xid() for details).
3331 : */
3332 0 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
3333 0 : leader = logicalrep_worker_find(WORKERTYPE_APPLY,
3334 0 : MyLogicalRepWorker->subid, InvalidOid,
3335 : false);
3336 0 : if (!leader)
3337 : {
3338 0 : ereport(ERROR,
3339 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3340 : errmsg("could not detect conflict as the leader apply worker has exited")));
3341 : }
3342 :
3343 0 : SpinLockAcquire(&leader->relmutex);
3344 0 : oldestxmin = leader->oldest_nonremovable_xid;
3345 0 : SpinLockRelease(&leader->relmutex);
3346 0 : LWLockRelease(LogicalRepWorkerLock);
3347 : }
3348 :
3349 : /*
3350 : * Return false if the leader apply worker has stopped retaining
3351 : * information for detecting conflicts. This implies that update_deleted
3352 : * can no longer be reliably detected.
3353 : */
3354 3 : if (!TransactionIdIsValid(oldestxmin))
3355 0 : return false;
3356 :
3357 4 : if (OidIsValid(localidxoid) &&
3358 1 : IsIndexUsableForFindingDeletedTuple(localidxoid, oldestxmin))
3359 1 : return RelationFindDeletedTupleInfoByIndex(localrel, localidxoid,
3360 : remoteslot, oldestxmin,
3361 : delete_xid, delete_origin,
3362 : delete_time);
3363 : else
3364 2 : return RelationFindDeletedTupleInfoSeq(localrel, remoteslot,
3365 : oldestxmin, delete_xid,
3366 : delete_origin, delete_time);
3367 : }
3368 :
3369 : /*
3370 : * This handles insert, update, delete on a partitioned table.
3371 : */
3372 : static void
3373 99 : apply_handle_tuple_routing(ApplyExecutionData *edata,
3374 : TupleTableSlot *remoteslot,
3375 : LogicalRepTupleData *newtup,
3376 : CmdType operation)
3377 : {
3378 99 : EState *estate = edata->estate;
3379 99 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
3380 99 : ResultRelInfo *relinfo = edata->targetRelInfo;
3381 99 : Relation parentrel = relinfo->ri_RelationDesc;
3382 : ModifyTableState *mtstate;
3383 : PartitionTupleRouting *proute;
3384 : ResultRelInfo *partrelinfo;
3385 : Relation partrel;
3386 : TupleTableSlot *remoteslot_part;
3387 : TupleConversionMap *map;
3388 : MemoryContext oldctx;
3389 99 : LogicalRepRelMapEntry *part_entry = NULL;
3390 99 : AttrMap *attrmap = NULL;
3391 :
3392 : /* ModifyTableState is needed for ExecFindPartition(). */
3393 99 : edata->mtstate = mtstate = makeNode(ModifyTableState);
3394 99 : mtstate->ps.plan = NULL;
3395 99 : mtstate->ps.state = estate;
3396 99 : mtstate->operation = operation;
3397 99 : mtstate->resultRelInfo = relinfo;
3398 :
3399 : /* ... as is PartitionTupleRouting. */
3400 99 : edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
3401 :
3402 : /*
3403 : * Find the partition to which the "search tuple" belongs.
3404 : */
3405 : Assert(remoteslot != NULL);
3406 99 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3407 99 : partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
3408 : remoteslot, estate);
3409 : Assert(partrelinfo != NULL);
3410 99 : partrel = partrelinfo->ri_RelationDesc;
3411 :
3412 : /*
3413 : * Check for supported relkind. We need this since partitions might be of
3414 : * unsupported relkinds; and the set of partitions can change, so checking
3415 : * at CREATE/ALTER SUBSCRIPTION would be insufficient.
3416 : */
3417 99 : CheckSubscriptionRelkind(partrel->rd_rel->relkind,
3418 99 : relmapentry->remoterel.relkind,
3419 99 : get_namespace_name(RelationGetNamespace(partrel)),
3420 99 : RelationGetRelationName(partrel));
3421 :
3422 : /*
3423 : * To perform any of the operations below, the tuple must match the
3424 : * partition's rowtype. Convert if needed or just copy, using a dedicated
3425 : * slot to store the tuple in any case.
3426 : */
3427 99 : remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
3428 99 : if (remoteslot_part == NULL)
3429 66 : remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
3430 99 : map = ExecGetRootToChildMap(partrelinfo, estate);
3431 99 : if (map != NULL)
3432 : {
3433 33 : attrmap = map->attrMap;
3434 33 : remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
3435 : remoteslot_part);
3436 : }
3437 : else
3438 : {
3439 66 : remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
3440 66 : slot_getallattrs(remoteslot_part);
3441 : }
3442 99 : MemoryContextSwitchTo(oldctx);
3443 :
3444 : /* Check if we can do the update or delete on the leaf partition. */
3445 99 : if (operation == CMD_UPDATE || operation == CMD_DELETE)
3446 : {
3447 30 : part_entry = logicalrep_partition_open(relmapentry, partrel,
3448 : attrmap);
3449 30 : check_relation_updatable(part_entry);
3450 : }
3451 :
3452 99 : switch (operation)
3453 : {
3454 69 : case CMD_INSERT:
3455 69 : apply_handle_insert_internal(edata, partrelinfo,
3456 : remoteslot_part);
3457 44 : break;
3458 :
3459 17 : case CMD_DELETE:
3460 17 : apply_handle_delete_internal(edata, partrelinfo,
3461 : remoteslot_part,
3462 : part_entry->localindexoid);
3463 17 : break;
3464 :
3465 13 : case CMD_UPDATE:
3466 :
3467 : /*
3468 : * For UPDATE, depending on whether or not the updated tuple
3469 : * satisfies the partition's constraint, perform a simple UPDATE
3470 : * of the partition or move the updated tuple into a different
3471 : * suitable partition.
3472 : */
3473 : {
3474 : TupleTableSlot *localslot;
3475 : ResultRelInfo *partrelinfo_new;
3476 : Relation partrel_new;
3477 : bool found;
3478 : EPQState epqstate;
3479 13 : ConflictTupleInfo conflicttuple = {0};
3480 :
3481 : /* Get the matching local tuple from the partition. */
3482 13 : found = FindReplTupleInLocalRel(edata, partrel,
3483 : &part_entry->remoterel,
3484 : part_entry->localindexoid,
3485 : remoteslot_part, &localslot);
3486 13 : if (!found)
3487 : {
3488 : ConflictType type;
3489 2 : TupleTableSlot *newslot = localslot;
3490 :
3491 : /*
3492 : * Detecting whether the tuple was recently deleted or
3493 : * never existed is crucial to avoid misleading the user
3494 : * during conflict handling.
3495 : */
3496 2 : if (FindDeletedTupleInLocalRel(partrel,
3497 : part_entry->localindexoid,
3498 : remoteslot_part,
3499 : &conflicttuple.xmin,
3500 : &conflicttuple.origin,
3501 0 : &conflicttuple.ts) &&
3502 0 : conflicttuple.origin != replorigin_xact_state.origin)
3503 0 : type = CT_UPDATE_DELETED;
3504 : else
3505 2 : type = CT_UPDATE_MISSING;
3506 :
3507 : /* Store the new tuple for conflict reporting */
3508 2 : slot_store_data(newslot, part_entry, newtup);
3509 :
3510 : /*
3511 : * The tuple to be updated could not be found or was
3512 : * deleted. Do nothing except for emitting a log message.
3513 : */
3514 2 : ReportApplyConflict(estate, partrelinfo, LOG,
3515 : type, remoteslot_part, newslot,
3516 : list_make1(&conflicttuple));
3517 :
3518 2 : return;
3519 : }
3520 :
3521 : /*
3522 : * Report the conflict if the tuple was modified by a
3523 : * different origin.
3524 : */
3525 11 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
3526 : &conflicttuple.origin,
3527 1 : &conflicttuple.ts) &&
3528 1 : conflicttuple.origin != replorigin_xact_state.origin)
3529 : {
3530 : TupleTableSlot *newslot;
3531 :
3532 : /* Store the new tuple for conflict reporting */
3533 1 : newslot = table_slot_create(partrel, &estate->es_tupleTable);
3534 1 : slot_store_data(newslot, part_entry, newtup);
3535 :
3536 1 : conflicttuple.slot = localslot;
3537 :
3538 1 : ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
3539 : remoteslot_part, newslot,
3540 : list_make1(&conflicttuple));
3541 : }
3542 :
3543 : /*
3544 : * Apply the update to the local tuple, putting the result in
3545 : * remoteslot_part.
3546 : */
3547 11 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3548 11 : slot_modify_data(remoteslot_part, localslot, part_entry,
3549 : newtup);
3550 11 : MemoryContextSwitchTo(oldctx);
3551 :
3552 11 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3553 :
3554 : /*
3555 : * Does the updated tuple still satisfy the current
3556 : * partition's constraint?
3557 : */
3558 22 : if (!partrel->rd_rel->relispartition ||
3559 11 : ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
3560 : false))
3561 : {
3562 : /*
3563 : * Yes, so simply UPDATE the partition. We don't call
3564 : * apply_handle_update_internal() here, which would
3565 : * normally do the following work, to avoid repeating some
3566 : * work already done above to find the local tuple in the
3567 : * partition.
3568 : */
3569 10 : InitConflictIndexes(partrelinfo);
3570 :
3571 10 : EvalPlanQualSetSlot(&epqstate, remoteslot_part);
3572 10 : TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
3573 : ACL_UPDATE);
3574 10 : ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3575 : localslot, remoteslot_part);
3576 : }
3577 : else
3578 : {
3579 : /* Move the tuple into the new partition. */
3580 :
3581 : /*
3582 : * New partition will be found using tuple routing, which
3583 : * can only occur via the parent table. We might need to
3584 : * convert the tuple to the parent's rowtype. Note that
3585 : * this is the tuple found in the partition, not the
3586 : * original search tuple received by this function.
3587 : */
3588 1 : if (map)
3589 : {
3590 : TupleConversionMap *PartitionToRootMap =
3591 1 : convert_tuples_by_name(RelationGetDescr(partrel),
3592 : RelationGetDescr(parentrel));
3593 :
3594 : remoteslot =
3595 1 : execute_attr_map_slot(PartitionToRootMap->attrMap,
3596 : remoteslot_part, remoteslot);
3597 : }
3598 : else
3599 : {
3600 0 : remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
3601 0 : slot_getallattrs(remoteslot);
3602 : }
3603 :
3604 : /* Find the new partition. */
3605 1 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3606 1 : partrelinfo_new = ExecFindPartition(mtstate, relinfo,
3607 : proute, remoteslot,
3608 : estate);
3609 1 : MemoryContextSwitchTo(oldctx);
3610 : Assert(partrelinfo_new != partrelinfo);
3611 1 : partrel_new = partrelinfo_new->ri_RelationDesc;
3612 :
3613 : /* Check that new partition also has supported relkind. */
3614 1 : CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3615 1 : relmapentry->remoterel.relkind,
3616 1 : get_namespace_name(RelationGetNamespace(partrel_new)),
3617 1 : RelationGetRelationName(partrel_new));
3618 :
3619 : /* DELETE old tuple found in the old partition. */
3620 1 : EvalPlanQualSetSlot(&epqstate, localslot);
3621 1 : TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, ACL_DELETE);
3622 1 : ExecSimpleRelationDelete(partrelinfo, estate, &epqstate, localslot);
3623 :
3624 : /* INSERT new tuple into the new partition. */
3625 :
3626 : /*
3627 : * Convert the replacement tuple to match the destination
3628 : * partition rowtype.
3629 : */
3630 1 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3631 1 : remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3632 1 : if (remoteslot_part == NULL)
3633 1 : remoteslot_part = table_slot_create(partrel_new,
3634 : &estate->es_tupleTable);
3635 1 : map = ExecGetRootToChildMap(partrelinfo_new, estate);
3636 1 : if (map != NULL)
3637 : {
3638 0 : remoteslot_part = execute_attr_map_slot(map->attrMap,
3639 : remoteslot,
3640 : remoteslot_part);
3641 : }
3642 : else
3643 : {
3644 1 : remoteslot_part = ExecCopySlot(remoteslot_part,
3645 : remoteslot);
3646 1 : slot_getallattrs(remoteslot);
3647 : }
3648 1 : MemoryContextSwitchTo(oldctx);
3649 1 : apply_handle_insert_internal(edata, partrelinfo_new,
3650 : remoteslot_part);
3651 : }
3652 :
3653 11 : EvalPlanQualEnd(&epqstate);
3654 : }
3655 11 : break;
3656 :
3657 0 : default:
3658 0 : elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3659 : break;
3660 : }
3661 : }
3662 :
3663 : /*
3664 : * Handle TRUNCATE message.
3665 : *
3666 : * TODO: FDW support
3667 : */
3668 : static void
3669 20 : apply_handle_truncate(StringInfo s)
3670 : {
3671 20 : bool cascade = false;
3672 20 : bool restart_seqs = false;
3673 20 : List *remote_relids = NIL;
3674 20 : List *remote_rels = NIL;
3675 20 : List *rels = NIL;
3676 20 : List *part_rels = NIL;
3677 20 : List *relids = NIL;
3678 20 : List *relids_logged = NIL;
3679 : ListCell *lc;
3680 20 : LOCKMODE lockmode = AccessExclusiveLock;
3681 :
3682 : /*
3683 : * Quick return if we are skipping data modification changes or handling
3684 : * streamed transactions.
3685 : */
3686 40 : if (is_skipping_changes() ||
3687 20 : handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
3688 0 : return;
3689 :
3690 20 : begin_replication_step();
3691 :
3692 20 : remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3693 :
3694 49 : foreach(lc, remote_relids)
3695 : {
3696 29 : LogicalRepRelId relid = lfirst_oid(lc);
3697 : LogicalRepRelMapEntry *rel;
3698 :
3699 29 : rel = logicalrep_rel_open(relid, lockmode);
3700 29 : if (!should_apply_changes_for_rel(rel))
3701 : {
3702 : /*
3703 : * The relation can't become interesting in the middle of the
3704 : * transaction so it's safe to unlock it.
3705 : */
3706 0 : logicalrep_rel_close(rel, lockmode);
3707 0 : continue;
3708 : }
3709 :
3710 29 : remote_rels = lappend(remote_rels, rel);
3711 29 : TargetPrivilegesCheck(rel->localrel, ACL_TRUNCATE);
3712 29 : rels = lappend(rels, rel->localrel);
3713 29 : relids = lappend_oid(relids, rel->localreloid);
3714 29 : if (RelationIsLogicallyLogged(rel->localrel))
3715 1 : relids_logged = lappend_oid(relids_logged, rel->localreloid);
3716 :
3717 : /*
3718 : * Truncate partitions if we got a message to truncate a partitioned
3719 : * table.
3720 : */
3721 29 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3722 : {
3723 : ListCell *child;
3724 4 : List *children = find_all_inheritors(rel->localreloid,
3725 : lockmode,
3726 : NULL);
3727 :
3728 15 : foreach(child, children)
3729 : {
3730 11 : Oid childrelid = lfirst_oid(child);
3731 : Relation childrel;
3732 :
3733 11 : if (list_member_oid(relids, childrelid))
3734 4 : continue;
3735 :
3736 : /* find_all_inheritors already got lock */
3737 7 : childrel = table_open(childrelid, NoLock);
3738 :
3739 : /*
3740 : * Ignore temp tables of other backends. See similar code in
3741 : * ExecuteTruncate().
3742 : */
3743 7 : if (RELATION_IS_OTHER_TEMP(childrel))
3744 : {
3745 0 : table_close(childrel, lockmode);
3746 0 : continue;
3747 : }
3748 :
3749 7 : TargetPrivilegesCheck(childrel, ACL_TRUNCATE);
3750 7 : rels = lappend(rels, childrel);
3751 7 : part_rels = lappend(part_rels, childrel);
3752 7 : relids = lappend_oid(relids, childrelid);
3753 : /* Log this relation only if needed for logical decoding */
3754 7 : if (RelationIsLogicallyLogged(childrel))
3755 0 : relids_logged = lappend_oid(relids_logged, childrelid);
3756 : }
3757 : }
3758 : }
3759 :
3760 : /*
3761 : * Even if we used CASCADE on the upstream primary we explicitly default
3762 : * to replaying changes without further cascading. This might be later
3763 : * changeable with a user specified option.
3764 : *
3765 : * MySubscription->runasowner tells us whether we want to execute
3766 : * replication actions as the subscription owner; the last argument to
3767 : * TruncateGuts tells it whether we want to switch to the table owner.
3768 : * Those are exactly opposite conditions.
3769 : */
3770 20 : ExecuteTruncateGuts(rels,
3771 : relids,
3772 : relids_logged,
3773 : DROP_RESTRICT,
3774 : restart_seqs,
3775 20 : !MySubscription->runasowner);
3776 49 : foreach(lc, remote_rels)
3777 : {
3778 29 : LogicalRepRelMapEntry *rel = lfirst(lc);
3779 :
3780 29 : logicalrep_rel_close(rel, NoLock);
3781 : }
3782 27 : foreach(lc, part_rels)
3783 : {
3784 7 : Relation rel = lfirst(lc);
3785 :
3786 7 : table_close(rel, NoLock);
3787 : }
3788 :
3789 20 : end_replication_step();
3790 : }
3791 :
3792 :
3793 : /*
3794 : * Logical replication protocol message dispatcher.
3795 : */
3796 : void
3797 361089 : apply_dispatch(StringInfo s)
3798 : {
3799 361089 : LogicalRepMsgType action = pq_getmsgbyte(s);
3800 : LogicalRepMsgType saved_command;
3801 :
3802 : /*
3803 : * Set the current command being applied. Since this function can be
3804 : * called recursively when applying spooled changes, save the current
3805 : * command.
3806 : */
3807 361089 : saved_command = apply_error_callback_arg.command;
3808 361089 : apply_error_callback_arg.command = action;
3809 :
3810 361089 : switch (action)
3811 : {
3812 533 : case LOGICAL_REP_MSG_BEGIN:
3813 533 : apply_handle_begin(s);
3814 533 : break;
3815 :
3816 467 : case LOGICAL_REP_MSG_COMMIT:
3817 467 : apply_handle_commit(s);
3818 466 : break;
3819 :
3820 209554 : case LOGICAL_REP_MSG_INSERT:
3821 209554 : apply_handle_insert(s);
3822 209497 : break;
3823 :
3824 66178 : case LOGICAL_REP_MSG_UPDATE:
3825 66178 : apply_handle_update(s);
3826 66167 : break;
3827 :
3828 81942 : case LOGICAL_REP_MSG_DELETE:
3829 81942 : apply_handle_delete(s);
3830 81942 : break;
3831 :
3832 20 : case LOGICAL_REP_MSG_TRUNCATE:
3833 20 : apply_handle_truncate(s);
3834 20 : break;
3835 :
3836 511 : case LOGICAL_REP_MSG_RELATION:
3837 511 : apply_handle_relation(s);
3838 511 : break;
3839 :
3840 18 : case LOGICAL_REP_MSG_TYPE:
3841 18 : apply_handle_type(s);
3842 18 : break;
3843 :
3844 7 : case LOGICAL_REP_MSG_ORIGIN:
3845 7 : apply_handle_origin(s);
3846 7 : break;
3847 :
3848 0 : case LOGICAL_REP_MSG_MESSAGE:
3849 :
3850 : /*
3851 : * Logical replication does not use generic logical messages yet.
3852 : * Although, it could be used by other applications that use this
3853 : * output plugin.
3854 : */
3855 0 : break;
3856 :
3857 842 : case LOGICAL_REP_MSG_STREAM_START:
3858 842 : apply_handle_stream_start(s);
3859 842 : break;
3860 :
3861 841 : case LOGICAL_REP_MSG_STREAM_STOP:
3862 841 : apply_handle_stream_stop(s);
3863 839 : break;
3864 :
3865 38 : case LOGICAL_REP_MSG_STREAM_ABORT:
3866 38 : apply_handle_stream_abort(s);
3867 38 : break;
3868 :
3869 61 : case LOGICAL_REP_MSG_STREAM_COMMIT:
3870 61 : apply_handle_stream_commit(s);
3871 59 : break;
3872 :
3873 18 : case LOGICAL_REP_MSG_BEGIN_PREPARE:
3874 18 : apply_handle_begin_prepare(s);
3875 18 : break;
3876 :
3877 17 : case LOGICAL_REP_MSG_PREPARE:
3878 17 : apply_handle_prepare(s);
3879 15 : break;
3880 :
3881 22 : case LOGICAL_REP_MSG_COMMIT_PREPARED:
3882 22 : apply_handle_commit_prepared(s);
3883 22 : break;
3884 :
3885 5 : case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
3886 5 : apply_handle_rollback_prepared(s);
3887 5 : break;
3888 :
3889 15 : case LOGICAL_REP_MSG_STREAM_PREPARE:
3890 15 : apply_handle_stream_prepare(s);
3891 13 : break;
3892 :
3893 0 : default:
3894 0 : ereport(ERROR,
3895 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
3896 : errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3897 : }
3898 :
3899 : /* Reset the current command */
3900 361012 : apply_error_callback_arg.command = saved_command;
3901 361012 : }
3902 :
3903 : /*
3904 : * Figure out which write/flush positions to report to the walsender process.
3905 : *
3906 : * We can't simply report back the last LSN the walsender sent us because the
3907 : * local transaction might not yet be flushed to disk locally. Instead we
3908 : * build a list that associates local with remote LSNs for every commit. When
3909 : * reporting back the flush position to the sender we iterate that list and
3910 : * check which entries on it are already locally flushed. Those we can report
3911 : * as having been flushed.
3912 : *
3913 : * The have_pending_txes is true if there are outstanding transactions that
3914 : * need to be flushed.
3915 : */
3916 : static void
3917 28014 : get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
3918 : bool *have_pending_txes)
3919 : {
3920 : dlist_mutable_iter iter;
3921 28014 : XLogRecPtr local_flush = GetFlushRecPtr(NULL);
3922 :
3923 28014 : *write = InvalidXLogRecPtr;
3924 28014 : *flush = InvalidXLogRecPtr;
3925 :
3926 28552 : dlist_foreach_modify(iter, &lsn_mapping)
3927 : {
3928 3053 : FlushPosition *pos =
3929 : dlist_container(FlushPosition, node, iter.cur);
3930 :
3931 3053 : *write = pos->remote_end;
3932 :
3933 3053 : if (pos->local_end <= local_flush)
3934 : {
3935 538 : *flush = pos->remote_end;
3936 538 : dlist_delete(iter.cur);
3937 538 : pfree(pos);
3938 : }
3939 : else
3940 : {
3941 : /*
3942 : * Don't want to uselessly iterate over the rest of the list which
3943 : * could potentially be long. Instead get the last element and
3944 : * grab the write position from there.
3945 : */
3946 2515 : pos = dlist_tail_element(FlushPosition, node,
3947 : &lsn_mapping);
3948 2515 : *write = pos->remote_end;
3949 2515 : *have_pending_txes = true;
3950 2515 : return;
3951 : }
3952 : }
3953 :
3954 25499 : *have_pending_txes = !dlist_is_empty(&lsn_mapping);
3955 : }
3956 :
3957 : /*
3958 : * Store current remote/local lsn pair in the tracking list.
3959 : */
3960 : void
3961 577 : store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
3962 : {
3963 : FlushPosition *flushpos;
3964 :
3965 : /*
3966 : * Skip for parallel apply workers, because the lsn_mapping is maintained
3967 : * by the leader apply worker.
3968 : */
3969 577 : if (am_parallel_apply_worker())
3970 19 : return;
3971 :
3972 : /* Need to do this in permanent context */
3973 558 : MemoryContextSwitchTo(ApplyContext);
3974 :
3975 : /* Track commit lsn */
3976 558 : flushpos = palloc_object(FlushPosition);
3977 558 : flushpos->local_end = local_lsn;
3978 558 : flushpos->remote_end = remote_lsn;
3979 :
3980 558 : dlist_push_tail(&lsn_mapping, &flushpos->node);
3981 558 : MemoryContextSwitchTo(ApplyMessageContext);
3982 : }
3983 :
3984 :
3985 : /* Update statistics of the worker. */
3986 : static void
3987 214543 : UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
3988 : {
3989 214543 : MyLogicalRepWorker->last_lsn = last_lsn;
3990 214543 : MyLogicalRepWorker->last_send_time = send_time;
3991 214543 : MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
3992 214543 : if (reply)
3993 : {
3994 1865 : MyLogicalRepWorker->reply_lsn = last_lsn;
3995 1865 : MyLogicalRepWorker->reply_time = send_time;
3996 : }
3997 214543 : }
3998 :
3999 : /*
4000 : * Apply main loop.
4001 : */
4002 : static void
4003 456 : LogicalRepApplyLoop(XLogRecPtr last_received)
4004 : {
4005 456 : TimestampTz last_recv_timestamp = GetCurrentTimestamp();
4006 456 : bool ping_sent = false;
4007 : TimeLineID tli;
4008 : ErrorContextCallback errcallback;
4009 456 : RetainDeadTuplesData rdt_data = {0};
4010 :
4011 : /*
4012 : * Init the ApplyMessageContext which we clean up after each replication
4013 : * protocol message.
4014 : */
4015 456 : ApplyMessageContext = AllocSetContextCreate(ApplyContext,
4016 : "ApplyMessageContext",
4017 : ALLOCSET_DEFAULT_SIZES);
4018 :
4019 : /*
4020 : * This memory context is used for per-stream data when the streaming mode
4021 : * is enabled. This context is reset on each stream stop.
4022 : */
4023 456 : LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
4024 : "LogicalStreamingContext",
4025 : ALLOCSET_DEFAULT_SIZES);
4026 :
4027 : /* mark as idle, before starting to loop */
4028 456 : pgstat_report_activity(STATE_IDLE, NULL);
4029 :
4030 : /*
4031 : * Push apply error context callback. Fields will be filled while applying
4032 : * a change.
4033 : */
4034 456 : errcallback.callback = apply_error_callback;
4035 456 : errcallback.previous = error_context_stack;
4036 456 : error_context_stack = &errcallback;
4037 456 : apply_error_context_stack = error_context_stack;
4038 :
4039 : /* This outer loop iterates once per wait. */
4040 : for (;;)
4041 25348 : {
4042 25804 : pgsocket fd = PGINVALID_SOCKET;
4043 : int rc;
4044 : int len;
4045 25804 : char *buf = NULL;
4046 25804 : bool endofstream = false;
4047 : long wait_time;
4048 :
4049 25804 : CHECK_FOR_INTERRUPTS();
4050 :
4051 25803 : MemoryContextSwitchTo(ApplyMessageContext);
4052 :
4053 25803 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
4054 :
4055 25782 : if (len != 0)
4056 : {
4057 : /* Loop to process all available data (without blocking). */
4058 : for (;;)
4059 : {
4060 238965 : CHECK_FOR_INTERRUPTS();
4061 :
4062 238965 : if (len == 0)
4063 : {
4064 24410 : break;
4065 : }
4066 214555 : else if (len < 0)
4067 : {
4068 11 : ereport(LOG,
4069 : (errmsg("data stream from publisher has ended")));
4070 11 : endofstream = true;
4071 11 : break;
4072 : }
4073 : else
4074 : {
4075 : int c;
4076 : StringInfoData s;
4077 :
4078 214544 : if (ConfigReloadPending)
4079 : {
4080 0 : ConfigReloadPending = false;
4081 0 : ProcessConfigFile(PGC_SIGHUP);
4082 : }
4083 :
4084 : /* Reset timeout. */
4085 214544 : last_recv_timestamp = GetCurrentTimestamp();
4086 214544 : ping_sent = false;
4087 :
4088 214544 : rdt_data.last_recv_time = last_recv_timestamp;
4089 :
4090 : /* Ensure we are reading the data into our memory context. */
4091 214544 : MemoryContextSwitchTo(ApplyMessageContext);
4092 :
4093 214544 : initReadOnlyStringInfo(&s, buf, len);
4094 :
4095 214544 : c = pq_getmsgbyte(&s);
4096 :
4097 214544 : if (c == PqReplMsg_WALData)
4098 : {
4099 : XLogRecPtr start_lsn;
4100 : XLogRecPtr end_lsn;
4101 : TimestampTz send_time;
4102 :
4103 208733 : start_lsn = pq_getmsgint64(&s);
4104 208733 : end_lsn = pq_getmsgint64(&s);
4105 208733 : send_time = pq_getmsgint64(&s);
4106 :
4107 208733 : if (last_received < start_lsn)
4108 152453 : last_received = start_lsn;
4109 :
4110 208733 : if (last_received < end_lsn)
4111 0 : last_received = end_lsn;
4112 :
4113 208733 : UpdateWorkerStats(last_received, send_time, false);
4114 :
4115 208733 : apply_dispatch(&s);
4116 :
4117 208661 : maybe_advance_nonremovable_xid(&rdt_data, false);
4118 : }
4119 5811 : else if (c == PqReplMsg_Keepalive)
4120 : {
4121 : XLogRecPtr end_lsn;
4122 : TimestampTz timestamp;
4123 : bool reply_requested;
4124 :
4125 1866 : end_lsn = pq_getmsgint64(&s);
4126 1866 : timestamp = pq_getmsgint64(&s);
4127 1866 : reply_requested = pq_getmsgbyte(&s);
4128 :
4129 1866 : if (last_received < end_lsn)
4130 1041 : last_received = end_lsn;
4131 :
4132 1866 : send_feedback(last_received, reply_requested, false);
4133 :
4134 1865 : maybe_advance_nonremovable_xid(&rdt_data, false);
4135 :
4136 1865 : UpdateWorkerStats(last_received, timestamp, true);
4137 : }
4138 3945 : else if (c == PqReplMsg_PrimaryStatusUpdate)
4139 : {
4140 3945 : rdt_data.remote_lsn = pq_getmsgint64(&s);
4141 3945 : rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
4142 3945 : rdt_data.remote_nextxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
4143 3945 : rdt_data.reply_time = pq_getmsgint64(&s);
4144 :
4145 : /*
4146 : * This should never happen, see
4147 : * ProcessStandbyPSRequestMessage. But if it happens
4148 : * due to a bug, we don't want to proceed as it can
4149 : * incorrectly advance oldest_nonremovable_xid.
4150 : */
4151 3945 : if (!XLogRecPtrIsValid(rdt_data.remote_lsn))
4152 0 : elog(ERROR, "cannot get the latest WAL position from the publisher");
4153 :
4154 3945 : maybe_advance_nonremovable_xid(&rdt_data, true);
4155 :
4156 3945 : UpdateWorkerStats(last_received, rdt_data.reply_time, false);
4157 : }
4158 : /* other message types are purposefully ignored */
4159 :
4160 214471 : MemoryContextReset(ApplyMessageContext);
4161 : }
4162 :
4163 214471 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
4164 : }
4165 : }
4166 :
4167 : /* confirm all writes so far */
4168 25708 : send_feedback(last_received, false, false);
4169 :
4170 : /* Reset the timestamp if no message was received */
4171 25708 : rdt_data.last_recv_time = 0;
4172 :
4173 25708 : maybe_advance_nonremovable_xid(&rdt_data, false);
4174 :
4175 25707 : if (!in_remote_transaction && !in_streamed_transaction)
4176 : {
4177 : /*
4178 : * If we didn't get any transactions for a while there might be
4179 : * unconsumed invalidation messages in the queue, consume them
4180 : * now.
4181 : */
4182 5607 : AcceptInvalidationMessages();
4183 5607 : maybe_reread_subscription();
4184 :
4185 : /*
4186 : * Process any relations that are being synchronized in parallel
4187 : * and any newly added tables or sequences.
4188 : */
4189 5563 : ProcessSyncingRelations(last_received);
4190 : }
4191 :
4192 : /* Cleanup the memory. */
4193 25460 : MemoryContextReset(ApplyMessageContext);
4194 25460 : MemoryContextSwitchTo(TopMemoryContext);
4195 :
4196 : /* Check if we need to exit the streaming loop. */
4197 25460 : if (endofstream)
4198 11 : break;
4199 :
4200 : /*
4201 : * Wait for more data or latch. If we have unflushed transactions,
4202 : * wake up after WalWriterDelay to see if they've been flushed yet (in
4203 : * which case we should send a feedback message). Otherwise, there's
4204 : * no particular urgency about waking up unless we get data or a
4205 : * signal.
4206 : */
4207 25449 : if (!dlist_is_empty(&lsn_mapping))
4208 2027 : wait_time = WalWriterDelay;
4209 : else
4210 23422 : wait_time = NAPTIME_PER_CYCLE;
4211 :
4212 : /*
4213 : * Ensure to wake up when it's possible to advance the non-removable
4214 : * transaction ID, or when the retention duration may have exceeded
4215 : * max_retention_duration.
4216 : */
4217 25449 : if (MySubscription->retentionactive)
4218 : {
4219 2800 : if (rdt_data.phase == RDT_GET_CANDIDATE_XID &&
4220 156 : rdt_data.xid_advance_interval)
4221 155 : wait_time = Min(wait_time, rdt_data.xid_advance_interval);
4222 2645 : else if (MySubscription->maxretention > 0)
4223 1 : wait_time = Min(wait_time, MySubscription->maxretention);
4224 : }
4225 :
4226 25449 : rc = WaitLatchOrSocket(MyLatch,
4227 : WL_SOCKET_READABLE | WL_LATCH_SET |
4228 : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
4229 : fd, wait_time,
4230 : WAIT_EVENT_LOGICAL_APPLY_MAIN);
4231 :
4232 25449 : if (rc & WL_LATCH_SET)
4233 : {
4234 797 : ResetLatch(MyLatch);
4235 797 : CHECK_FOR_INTERRUPTS();
4236 : }
4237 :
4238 25348 : if (ConfigReloadPending)
4239 : {
4240 10 : ConfigReloadPending = false;
4241 10 : ProcessConfigFile(PGC_SIGHUP);
4242 : }
4243 :
4244 25348 : if (rc & WL_TIMEOUT)
4245 : {
4246 : /*
4247 : * We didn't receive anything new. If we haven't heard anything
4248 : * from the server for more than wal_receiver_timeout / 2, ping
4249 : * the server. Also, if it's been longer than
4250 : * wal_receiver_status_interval since the last update we sent,
4251 : * send a status update to the primary anyway, to report any
4252 : * progress in applying WAL.
4253 : */
4254 430 : bool requestReply = false;
4255 :
4256 : /*
4257 : * Check if time since last receive from primary has reached the
4258 : * configured limit.
4259 : */
4260 430 : if (wal_receiver_timeout > 0)
4261 : {
4262 430 : TimestampTz now = GetCurrentTimestamp();
4263 : TimestampTz timeout;
4264 :
4265 430 : timeout =
4266 430 : TimestampTzPlusMilliseconds(last_recv_timestamp,
4267 : wal_receiver_timeout);
4268 :
4269 430 : if (now >= timeout)
4270 0 : ereport(ERROR,
4271 : (errcode(ERRCODE_CONNECTION_FAILURE),
4272 : errmsg("terminating logical replication worker due to timeout")));
4273 :
4274 : /* Check to see if it's time for a ping. */
4275 430 : if (!ping_sent)
4276 : {
4277 430 : timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
4278 : (wal_receiver_timeout / 2));
4279 430 : if (now >= timeout)
4280 : {
4281 0 : requestReply = true;
4282 0 : ping_sent = true;
4283 : }
4284 : }
4285 : }
4286 :
4287 430 : send_feedback(last_received, requestReply, requestReply);
4288 :
4289 430 : maybe_advance_nonremovable_xid(&rdt_data, false);
4290 :
4291 : /*
4292 : * Force reporting to ensure long idle periods don't lead to
4293 : * arbitrarily delayed stats. Stats can only be reported outside
4294 : * of (implicit or explicit) transactions. That shouldn't lead to
4295 : * stats being delayed for long, because transactions are either
4296 : * sent as a whole on commit or streamed. Streamed transactions
4297 : * are spilled to disk and applied on commit.
4298 : */
4299 430 : if (!IsTransactionState())
4300 430 : pgstat_report_stat(true);
4301 : }
4302 : }
4303 :
4304 : /* Pop the error context stack */
4305 11 : error_context_stack = errcallback.previous;
4306 11 : apply_error_context_stack = error_context_stack;
4307 :
4308 : /* All done */
4309 11 : walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
4310 0 : }
4311 :
4312 : /*
4313 : * Send a Standby Status Update message to server.
4314 : *
4315 : * 'recvpos' is the latest LSN we've received data to, force is set if we need
4316 : * to send a response to avoid timeouts.
4317 : */
4318 : static void
4319 28004 : send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
4320 : {
4321 : static StringInfo reply_message = NULL;
4322 : static TimestampTz send_time = 0;
4323 :
4324 : static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
4325 : static XLogRecPtr last_writepos = InvalidXLogRecPtr;
4326 :
4327 : XLogRecPtr writepos;
4328 : XLogRecPtr flushpos;
4329 : TimestampTz now;
4330 : bool have_pending_txes;
4331 :
4332 : /*
4333 : * If the user doesn't want status to be reported to the publisher, be
4334 : * sure to exit before doing anything at all.
4335 : */
4336 28004 : if (!force && wal_receiver_status_interval <= 0)
4337 10412 : return;
4338 :
4339 : /* It's legal to not pass a recvpos */
4340 28004 : if (recvpos < last_recvpos)
4341 0 : recvpos = last_recvpos;
4342 :
4343 28004 : get_flush_position(&writepos, &flushpos, &have_pending_txes);
4344 :
4345 : /*
4346 : * No outstanding transactions to flush, we can report the latest received
4347 : * position. This is important for synchronous replication.
4348 : */
4349 28004 : if (!have_pending_txes)
4350 25492 : flushpos = writepos = recvpos;
4351 :
4352 28004 : if (writepos < last_writepos)
4353 0 : writepos = last_writepos;
4354 :
4355 28004 : if (flushpos < last_flushpos)
4356 2462 : flushpos = last_flushpos;
4357 :
4358 28004 : now = GetCurrentTimestamp();
4359 :
4360 : /* if we've already reported everything we're good */
4361 28004 : if (!force &&
4362 27998 : writepos == last_writepos &&
4363 10743 : flushpos == last_flushpos &&
4364 10517 : !TimestampDifferenceExceeds(send_time, now,
4365 : wal_receiver_status_interval * 1000))
4366 10412 : return;
4367 17592 : send_time = now;
4368 :
4369 17592 : if (!reply_message)
4370 : {
4371 456 : MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
4372 :
4373 456 : reply_message = makeStringInfo();
4374 456 : MemoryContextSwitchTo(oldctx);
4375 : }
4376 : else
4377 17136 : resetStringInfo(reply_message);
4378 :
4379 17592 : pq_sendbyte(reply_message, PqReplMsg_StandbyStatusUpdate);
4380 17592 : pq_sendint64(reply_message, recvpos); /* write */
4381 17592 : pq_sendint64(reply_message, flushpos); /* flush */
4382 17592 : pq_sendint64(reply_message, writepos); /* apply */
4383 17592 : pq_sendint64(reply_message, now); /* sendTime */
4384 17592 : pq_sendbyte(reply_message, requestReply); /* replyRequested */
4385 :
4386 17592 : elog(DEBUG2, "sending feedback (force %d) to recv %X/%08X, write %X/%08X, flush %X/%08X",
4387 : force,
4388 : LSN_FORMAT_ARGS(recvpos),
4389 : LSN_FORMAT_ARGS(writepos),
4390 : LSN_FORMAT_ARGS(flushpos));
4391 :
4392 17592 : walrcv_send(LogRepWorkerWalRcvConn,
4393 : reply_message->data, reply_message->len);
4394 :
4395 17591 : if (recvpos > last_recvpos)
4396 17255 : last_recvpos = recvpos;
4397 17591 : if (writepos > last_writepos)
4398 17260 : last_writepos = writepos;
4399 17591 : if (flushpos > last_flushpos)
4400 17088 : last_flushpos = flushpos;
4401 : }
4402 :
4403 : /*
4404 : * Attempt to advance the non-removable transaction ID.
4405 : *
4406 : * See comments atop worker.c for details.
4407 : */
4408 : static void
4409 240609 : maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
4410 : bool status_received)
4411 : {
4412 240609 : if (!can_advance_nonremovable_xid(rdt_data))
4413 233611 : return;
4414 :
4415 6998 : process_rdt_phase_transition(rdt_data, status_received);
4416 : }
4417 :
4418 : /*
4419 : * Preliminary check to determine if advancing the non-removable transaction ID
4420 : * is allowed.
4421 : */
4422 : static bool
4423 240609 : can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
4424 : {
4425 : /*
4426 : * It is sufficient to manage non-removable transaction ID for a
4427 : * subscription by the main apply worker to detect update_deleted reliably
4428 : * even for table sync or parallel apply workers.
4429 : */
4430 240609 : if (!am_leader_apply_worker())
4431 3392 : return false;
4432 :
4433 : /* No need to advance if retaining dead tuples is not required */
4434 237217 : if (!MySubscription->retaindeadtuples)
4435 230219 : return false;
4436 :
4437 6998 : return true;
4438 : }
4439 :
4440 : /*
4441 : * Process phase transitions during the non-removable transaction ID
4442 : * advancement. See comments atop worker.c for details of the transition.
4443 : */
4444 : static void
4445 11029 : process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
4446 : bool status_received)
4447 : {
4448 11029 : switch (rdt_data->phase)
4449 : {
4450 415 : case RDT_GET_CANDIDATE_XID:
4451 415 : get_candidate_xid(rdt_data);
4452 415 : break;
4453 3946 : case RDT_REQUEST_PUBLISHER_STATUS:
4454 3946 : request_publisher_status(rdt_data);
4455 3946 : break;
4456 6582 : case RDT_WAIT_FOR_PUBLISHER_STATUS:
4457 6582 : wait_for_publisher_status(rdt_data, status_received);
4458 6582 : break;
4459 84 : case RDT_WAIT_FOR_LOCAL_FLUSH:
4460 84 : wait_for_local_flush(rdt_data);
4461 84 : break;
4462 1 : case RDT_STOP_CONFLICT_INFO_RETENTION:
4463 1 : stop_conflict_info_retention(rdt_data);
4464 1 : break;
4465 1 : case RDT_RESUME_CONFLICT_INFO_RETENTION:
4466 1 : resume_conflict_info_retention(rdt_data);
4467 0 : break;
4468 : }
4469 11028 : }
4470 :
4471 : /*
4472 : * Workhorse for the RDT_GET_CANDIDATE_XID phase.
4473 : */
4474 : static void
4475 415 : get_candidate_xid(RetainDeadTuplesData *rdt_data)
4476 : {
4477 : TransactionId oldest_running_xid;
4478 : TimestampTz now;
4479 :
4480 : /*
4481 : * Use last_recv_time when applying changes in the loop to avoid
4482 : * unnecessary system time retrieval. If last_recv_time is not available,
4483 : * obtain the current timestamp.
4484 : */
4485 415 : now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4486 :
4487 : /*
4488 : * Compute the candidate_xid and request the publisher status at most once
4489 : * per xid_advance_interval. Refer to adjust_xid_advance_interval() for
4490 : * details on how this value is dynamically adjusted. This is to avoid
4491 : * using CPU and network resources without making much progress.
4492 : */
4493 415 : if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
4494 : rdt_data->xid_advance_interval))
4495 323 : return;
4496 :
4497 : /*
4498 : * Immediately update the timer, even if the function returns later
4499 : * without setting candidate_xid due to inactivity on the subscriber. This
4500 : * avoids frequent calls to GetOldestActiveTransactionId.
4501 : */
4502 92 : rdt_data->candidate_xid_time = now;
4503 :
4504 : /*
4505 : * Consider transactions in the current database, as only dead tuples from
4506 : * this database are required for conflict detection.
4507 : */
4508 92 : oldest_running_xid = GetOldestActiveTransactionId(false, false);
4509 :
4510 : /*
4511 : * Oldest active transaction ID (oldest_running_xid) can't be behind any
4512 : * of its previously computed value.
4513 : */
4514 : Assert(TransactionIdPrecedesOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
4515 : oldest_running_xid));
4516 :
4517 : /* Return if the oldest_nonremovable_xid cannot be advanced */
4518 92 : if (TransactionIdEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
4519 : oldest_running_xid))
4520 : {
4521 47 : adjust_xid_advance_interval(rdt_data, false);
4522 47 : return;
4523 : }
4524 :
4525 45 : adjust_xid_advance_interval(rdt_data, true);
4526 :
4527 45 : rdt_data->candidate_xid = oldest_running_xid;
4528 45 : rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
4529 :
4530 : /* process the next phase */
4531 45 : process_rdt_phase_transition(rdt_data, false);
4532 : }
4533 :
4534 : /*
4535 : * Workhorse for the RDT_REQUEST_PUBLISHER_STATUS phase.
4536 : */
4537 : static void
4538 3946 : request_publisher_status(RetainDeadTuplesData *rdt_data)
4539 : {
4540 : static StringInfo request_message = NULL;
4541 :
4542 3946 : if (!request_message)
4543 : {
4544 11 : MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
4545 :
4546 11 : request_message = makeStringInfo();
4547 11 : MemoryContextSwitchTo(oldctx);
4548 : }
4549 : else
4550 3935 : resetStringInfo(request_message);
4551 :
4552 : /*
4553 : * Send the current time to update the remote walsender's latest reply
4554 : * message received time.
4555 : */
4556 3946 : pq_sendbyte(request_message, PqReplMsg_PrimaryStatusRequest);
4557 3946 : pq_sendint64(request_message, GetCurrentTimestamp());
4558 :
4559 3946 : elog(DEBUG2, "sending publisher status request message");
4560 :
4561 : /* Send a request for the publisher status */
4562 3946 : walrcv_send(LogRepWorkerWalRcvConn,
4563 : request_message->data, request_message->len);
4564 :
4565 3946 : rdt_data->phase = RDT_WAIT_FOR_PUBLISHER_STATUS;
4566 :
4567 : /*
4568 : * Skip calling maybe_advance_nonremovable_xid() since further transition
4569 : * is possible only once we receive the publisher status message.
4570 : */
4571 3946 : }
4572 :
4573 : /*
4574 : * Workhorse for the RDT_WAIT_FOR_PUBLISHER_STATUS phase.
4575 : */
4576 : static void
4577 6582 : wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
4578 : bool status_received)
4579 : {
4580 : /*
4581 : * Return if we have requested but not yet received the publisher status.
4582 : */
4583 6582 : if (!status_received)
4584 2637 : return;
4585 :
4586 : /*
4587 : * We don't need to maintain oldest_nonremovable_xid if we decide to stop
4588 : * retaining conflict information for this worker.
4589 : */
4590 3945 : if (should_stop_conflict_info_retention(rdt_data))
4591 : {
4592 0 : rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
4593 0 : return;
4594 : }
4595 :
4596 3945 : if (!FullTransactionIdIsValid(rdt_data->remote_wait_for))
4597 44 : rdt_data->remote_wait_for = rdt_data->remote_nextxid;
4598 :
4599 : /*
4600 : * Check if all remote concurrent transactions that were active at the
4601 : * first status request have now completed. If completed, proceed to the
4602 : * next phase; otherwise, continue checking the publisher status until
4603 : * these transactions finish.
4604 : *
4605 : * It's possible that transactions in the commit phase during the last
4606 : * cycle have now finished committing, but remote_oldestxid remains older
4607 : * than remote_wait_for. This can happen if some old transaction came in
4608 : * the commit phase when we requested status in this cycle. We do not
4609 : * handle this case explicitly as it's rare and the benefit doesn't
4610 : * justify the required complexity. Tracking would require either caching
4611 : * all xids at the publisher or sending them to subscribers. The condition
4612 : * will resolve naturally once the remaining transactions are finished.
4613 : *
4614 : * Directly advancing the non-removable transaction ID is possible if
4615 : * there are no activities on the publisher since the last advancement
4616 : * cycle. However, it requires maintaining two fields, last_remote_nextxid
4617 : * and last_remote_lsn, within the structure for comparison with the
4618 : * current cycle's values. Considering the minimal cost of continuing in
4619 : * RDT_WAIT_FOR_LOCAL_FLUSH without awaiting changes, we opted not to
4620 : * advance the transaction ID here.
4621 : */
4622 3945 : if (FullTransactionIdPrecedesOrEquals(rdt_data->remote_wait_for,
4623 : rdt_data->remote_oldestxid))
4624 44 : rdt_data->phase = RDT_WAIT_FOR_LOCAL_FLUSH;
4625 : else
4626 3901 : rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
4627 :
4628 : /* process the next phase */
4629 3945 : process_rdt_phase_transition(rdt_data, false);
4630 : }
4631 :
4632 : /*
4633 : * Workhorse for the RDT_WAIT_FOR_LOCAL_FLUSH phase.
4634 : */
4635 : static void
4636 84 : wait_for_local_flush(RetainDeadTuplesData *rdt_data)
4637 : {
4638 : Assert(XLogRecPtrIsValid(rdt_data->remote_lsn) &&
4639 : TransactionIdIsValid(rdt_data->candidate_xid));
4640 :
4641 : /*
4642 : * We expect the publisher and subscriber clocks to be in sync using time
4643 : * sync service like NTP. Otherwise, we will advance this worker's
4644 : * oldest_nonremovable_xid prematurely, leading to the removal of rows
4645 : * required to detect update_deleted reliably. This check primarily
4646 : * addresses scenarios where the publisher's clock falls behind; if the
4647 : * publisher's clock is ahead, subsequent transactions will naturally bear
4648 : * later commit timestamps, conforming to the design outlined atop
4649 : * worker.c.
4650 : *
4651 : * XXX Consider waiting for the publisher's clock to catch up with the
4652 : * subscriber's before proceeding to the next phase.
4653 : */
4654 84 : if (TimestampDifferenceExceeds(rdt_data->reply_time,
4655 : rdt_data->candidate_xid_time, 0))
4656 0 : ereport(ERROR,
4657 : errmsg_internal("oldest_nonremovable_xid transaction ID could be advanced prematurely"),
4658 : errdetail_internal("The clock on the publisher is behind that of the subscriber."));
4659 :
4660 : /*
4661 : * Do not attempt to advance the non-removable transaction ID when table
4662 : * sync is in progress. During this time, changes from a single
4663 : * transaction may be applied by multiple table sync workers corresponding
4664 : * to the target tables. So, it's necessary for all table sync workers to
4665 : * apply and flush the corresponding changes before advancing the
4666 : * transaction ID, otherwise, dead tuples that are still needed for
4667 : * conflict detection in table sync workers could be removed prematurely.
4668 : * However, confirming the apply and flush progress across all table sync
4669 : * workers is complex and not worth the effort, so we simply return if not
4670 : * all tables are in the READY state.
4671 : *
4672 : * Advancing the transaction ID is necessary even when no tables are
4673 : * currently subscribed, to avoid retaining dead tuples unnecessarily.
4674 : * While it might seem safe to skip all phases and directly assign
4675 : * candidate_xid to oldest_nonremovable_xid during the
4676 : * RDT_GET_CANDIDATE_XID phase in such cases, this is unsafe. If users
4677 : * concurrently add tables to the subscription, the apply worker may not
4678 : * process invalidations in time. Consequently,
4679 : * HasSubscriptionTablesCached() might miss the new tables, leading to
4680 : * premature advancement of oldest_nonremovable_xid.
4681 : *
4682 : * Performing the check during RDT_WAIT_FOR_LOCAL_FLUSH is safe, as
4683 : * invalidations are guaranteed to be processed before applying changes
4684 : * from newly added tables while waiting for the local flush to reach
4685 : * remote_lsn.
4686 : *
4687 : * Additionally, even if we check for subscription tables during
4688 : * RDT_GET_CANDIDATE_XID, they might be dropped before reaching
4689 : * RDT_WAIT_FOR_LOCAL_FLUSH. Therefore, it's still necessary to verify
4690 : * subscription tables at this stage to prevent unnecessary tuple
4691 : * retention.
4692 : */
4693 84 : if (HasSubscriptionTablesCached() && !AllTablesyncsReady())
4694 : {
4695 : TimestampTz now;
4696 :
4697 4 : now = rdt_data->last_recv_time
4698 2 : ? rdt_data->last_recv_time : GetCurrentTimestamp();
4699 :
4700 : /*
4701 : * Record the time spent waiting for table sync, it is needed for the
4702 : * timeout check in should_stop_conflict_info_retention().
4703 : */
4704 2 : rdt_data->table_sync_wait_time =
4705 2 : TimestampDifferenceMilliseconds(rdt_data->candidate_xid_time, now);
4706 :
4707 2 : return;
4708 : }
4709 :
4710 : /*
4711 : * We don't need to maintain oldest_nonremovable_xid if we decide to stop
4712 : * retaining conflict information for this worker.
4713 : */
4714 82 : if (should_stop_conflict_info_retention(rdt_data))
4715 : {
4716 1 : rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
4717 1 : return;
4718 : }
4719 :
4720 : /*
4721 : * Update and check the remote flush position if we are applying changes
4722 : * in a loop. This is done at most once per WalWriterDelay to avoid
4723 : * performing costly operations in get_flush_position() too frequently
4724 : * during change application.
4725 : */
4726 106 : if (last_flushpos < rdt_data->remote_lsn && rdt_data->last_recv_time &&
4727 25 : TimestampDifferenceExceeds(rdt_data->flushpos_update_time,
4728 : rdt_data->last_recv_time, WalWriterDelay))
4729 : {
4730 : XLogRecPtr writepos;
4731 : XLogRecPtr flushpos;
4732 : bool have_pending_txes;
4733 :
4734 : /* Fetch the latest remote flush position */
4735 10 : get_flush_position(&writepos, &flushpos, &have_pending_txes);
4736 :
4737 10 : if (flushpos > last_flushpos)
4738 0 : last_flushpos = flushpos;
4739 :
4740 10 : rdt_data->flushpos_update_time = rdt_data->last_recv_time;
4741 : }
4742 :
4743 : /* Return to wait for the changes to be applied */
4744 81 : if (last_flushpos < rdt_data->remote_lsn)
4745 39 : return;
4746 :
4747 : /*
4748 : * Reaching this point implies should_stop_conflict_info_retention()
4749 : * returned false earlier, meaning that the most recent duration for
4750 : * advancing the non-removable transaction ID is within the
4751 : * max_retention_duration or max_retention_duration is set to 0.
4752 : *
4753 : * Therefore, if conflict info retention was previously stopped due to a
4754 : * timeout, it is now safe to resume retention.
4755 : */
4756 42 : if (!MySubscription->retentionactive)
4757 : {
4758 1 : rdt_data->phase = RDT_RESUME_CONFLICT_INFO_RETENTION;
4759 1 : return;
4760 : }
4761 :
4762 : /*
4763 : * Reaching here means the remote WAL position has been received, and all
4764 : * transactions up to that position on the publisher have been applied and
4765 : * flushed locally. So, we can advance the non-removable transaction ID.
4766 : */
4767 41 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
4768 41 : MyLogicalRepWorker->oldest_nonremovable_xid = rdt_data->candidate_xid;
4769 41 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
4770 :
4771 41 : elog(DEBUG2, "confirmed flush up to remote lsn %X/%08X: new oldest_nonremovable_xid %u",
4772 : LSN_FORMAT_ARGS(rdt_data->remote_lsn),
4773 : rdt_data->candidate_xid);
4774 :
4775 : /* Notify launcher to update the xmin of the conflict slot */
4776 41 : ApplyLauncherWakeup();
4777 :
4778 41 : reset_retention_data_fields(rdt_data);
4779 :
4780 : /* process the next phase */
4781 41 : process_rdt_phase_transition(rdt_data, false);
4782 : }
4783 :
4784 : /*
4785 : * Check whether conflict information retention should be stopped due to
4786 : * exceeding the maximum wait time (max_retention_duration).
4787 : *
4788 : * If retention should be stopped, return true. Otherwise, return false.
4789 : */
4790 : static bool
4791 4027 : should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
4792 : {
4793 : TimestampTz now;
4794 :
4795 : Assert(TransactionIdIsValid(rdt_data->candidate_xid));
4796 : Assert(rdt_data->phase == RDT_WAIT_FOR_PUBLISHER_STATUS ||
4797 : rdt_data->phase == RDT_WAIT_FOR_LOCAL_FLUSH);
4798 :
4799 4027 : if (!MySubscription->maxretention)
4800 4026 : return false;
4801 :
4802 : /*
4803 : * Use last_recv_time when applying changes in the loop to avoid
4804 : * unnecessary system time retrieval. If last_recv_time is not available,
4805 : * obtain the current timestamp.
4806 : */
4807 1 : now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4808 :
4809 : /*
4810 : * Return early if the wait time has not exceeded the configured maximum
4811 : * (max_retention_duration). Time spent waiting for table synchronization
4812 : * is excluded from this calculation, as it occurs infrequently.
4813 : */
4814 1 : if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
4815 1 : MySubscription->maxretention +
4816 1 : rdt_data->table_sync_wait_time))
4817 0 : return false;
4818 :
4819 1 : return true;
4820 : }
4821 :
4822 : /*
4823 : * Workhorse for the RDT_STOP_CONFLICT_INFO_RETENTION phase.
4824 : */
4825 : static void
4826 1 : stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
4827 : {
4828 : /* Stop retention if not yet */
4829 1 : if (MySubscription->retentionactive)
4830 : {
4831 : /*
4832 : * If the retention status cannot be updated (e.g., due to active
4833 : * transaction), skip further processing to avoid inconsistent
4834 : * retention behavior.
4835 : */
4836 1 : if (!update_retention_status(false))
4837 0 : return;
4838 :
4839 1 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
4840 1 : MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId;
4841 1 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
4842 :
4843 1 : ereport(LOG,
4844 : errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts",
4845 : MySubscription->name),
4846 : errdetail("Retention is stopped because the apply process has not caught up with the publisher within the configured max_retention_duration."));
4847 : }
4848 :
4849 : Assert(!TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid));
4850 :
4851 : /*
4852 : * If retention has been stopped, reset to the initial phase to retry
4853 : * resuming retention. This reset is required to recalculate the current
4854 : * wait time and resume retention if the time falls within
4855 : * max_retention_duration.
4856 : */
4857 1 : reset_retention_data_fields(rdt_data);
4858 : }
4859 :
4860 : /*
4861 : * Workhorse for the RDT_RESUME_CONFLICT_INFO_RETENTION phase.
4862 : */
4863 : static void
4864 1 : resume_conflict_info_retention(RetainDeadTuplesData *rdt_data)
4865 : {
4866 : /* We can't resume retention without updating retention status. */
4867 1 : if (!update_retention_status(true))
4868 0 : return;
4869 :
4870 1 : ereport(LOG,
4871 : errmsg("logical replication worker for subscription \"%s\" will resume retaining the information for detecting conflicts",
4872 : MySubscription->name),
4873 : MySubscription->maxretention
4874 : ? errdetail("Retention is re-enabled because the apply process has caught up with the publisher within the configured max_retention_duration.")
4875 : : errdetail("Retention is re-enabled because max_retention_duration has been set to unlimited."));
4876 :
4877 : /*
4878 : * Restart the worker to let the launcher initialize
4879 : * oldest_nonremovable_xid at startup.
4880 : *
4881 : * While it's technically possible to derive this value on-the-fly using
4882 : * the conflict detection slot's xmin, doing so risks a race condition:
4883 : * the launcher might clean slot.xmin just after retention resumes. This
4884 : * would make oldest_nonremovable_xid unreliable, especially during xid
4885 : * wraparound.
4886 : *
4887 : * Although this can be prevented by introducing heavy weight locking, the
4888 : * complexity it will bring doesn't seem worthwhile given how rarely
4889 : * retention is resumed.
4890 : */
4891 1 : apply_worker_exit();
4892 : }
4893 :
4894 : /*
4895 : * Updates pg_subscription.subretentionactive to the given value within a
4896 : * new transaction.
4897 : *
4898 : * If already inside an active transaction, skips the update and returns
4899 : * false.
4900 : *
4901 : * Returns true if the update is successfully performed.
4902 : */
4903 : static bool
4904 2 : update_retention_status(bool active)
4905 : {
4906 : /*
4907 : * Do not update the catalog during an active transaction. The transaction
4908 : * may be started during change application, leading to a possible
4909 : * rollback of catalog updates if the application fails subsequently.
4910 : */
4911 2 : if (IsTransactionState())
4912 0 : return false;
4913 :
4914 2 : StartTransactionCommand();
4915 :
4916 : /*
4917 : * Updating pg_subscription might involve TOAST table access, so ensure we
4918 : * have a valid snapshot.
4919 : */
4920 2 : PushActiveSnapshot(GetTransactionSnapshot());
4921 :
4922 : /* Update pg_subscription.subretentionactive */
4923 2 : UpdateDeadTupleRetentionStatus(MySubscription->oid, active);
4924 :
4925 2 : PopActiveSnapshot();
4926 2 : CommitTransactionCommand();
4927 :
4928 : /* Notify launcher to update the conflict slot */
4929 2 : ApplyLauncherWakeup();
4930 :
4931 2 : MySubscription->retentionactive = active;
4932 :
4933 2 : return true;
4934 : }
4935 :
4936 : /*
4937 : * Reset all data fields of RetainDeadTuplesData except those used to
4938 : * determine the timing for the next round of transaction ID advancement. We
4939 : * can even use flushpos_update_time in the next round to decide whether to get
4940 : * the latest flush position.
4941 : */
4942 : static void
4943 42 : reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
4944 : {
4945 42 : rdt_data->phase = RDT_GET_CANDIDATE_XID;
4946 42 : rdt_data->remote_lsn = InvalidXLogRecPtr;
4947 42 : rdt_data->remote_oldestxid = InvalidFullTransactionId;
4948 42 : rdt_data->remote_nextxid = InvalidFullTransactionId;
4949 42 : rdt_data->reply_time = 0;
4950 42 : rdt_data->remote_wait_for = InvalidFullTransactionId;
4951 42 : rdt_data->candidate_xid = InvalidTransactionId;
4952 42 : rdt_data->table_sync_wait_time = 0;
4953 42 : }
4954 :
4955 : /*
4956 : * Adjust the interval for advancing non-removable transaction IDs.
4957 : *
4958 : * If there is no activity on the node or retention has been stopped, we
4959 : * progressively double the interval used to advance non-removable transaction
4960 : * ID. This helps conserve CPU and network resources when there's little benefit
4961 : * to frequent updates.
4962 : *
4963 : * The interval is capped by the lowest of the following:
4964 : * - wal_receiver_status_interval (if set and retention is active),
4965 : * - a default maximum of 3 minutes,
4966 : * - max_retention_duration (if retention is active).
4967 : *
4968 : * This ensures the interval never exceeds the retention boundary, even if other
4969 : * limits are higher. Once activity resumes on the node and the retention is
4970 : * active, the interval is reset to lesser of 100ms and max_retention_duration,
4971 : * allowing timely advancement of non-removable transaction ID.
4972 : *
4973 : * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
4974 : * consider the other interval or a separate GUC if the need arises.
4975 : */
4976 : static void
4977 92 : adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
4978 : {
4979 92 : if (rdt_data->xid_advance_interval && !new_xid_found)
4980 42 : {
4981 42 : int max_interval = wal_receiver_status_interval
4982 84 : ? wal_receiver_status_interval * 1000
4983 42 : : MAX_XID_ADVANCE_INTERVAL;
4984 :
4985 : /*
4986 : * No new transaction ID has been assigned since the last check, so
4987 : * double the interval, but not beyond the maximum allowable value.
4988 : */
4989 42 : rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
4990 : max_interval);
4991 : }
4992 50 : else if (rdt_data->xid_advance_interval &&
4993 38 : !MySubscription->retentionactive)
4994 : {
4995 : /*
4996 : * Retention has been stopped, so double the interval-capped at a
4997 : * maximum of 3 minutes. The wal_receiver_status_interval is
4998 : * intentionally not used as an upper bound, since the likelihood of
4999 : * retention resuming is lower than that of general activity resuming.
5000 : */
5001 1 : rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
5002 : MAX_XID_ADVANCE_INTERVAL);
5003 : }
5004 : else
5005 : {
5006 : /*
5007 : * A new transaction ID was found or the interval is not yet
5008 : * initialized, so set the interval to the minimum value.
5009 : */
5010 49 : rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL;
5011 : }
5012 :
5013 : /*
5014 : * Ensure the wait time remains within the maximum retention time limit
5015 : * when retention is active. Skip this cap when maxretention is zero,
5016 : * which means unlimited retention (no timeout).
5017 : */
5018 92 : if (MySubscription->retentionactive && MySubscription->maxretention > 0)
5019 0 : rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval,
5020 : MySubscription->maxretention);
5021 92 : }
5022 :
5023 : /*
5024 : * Exit routine for apply workers due to subscription parameter changes.
5025 : */
5026 : static void
5027 48 : apply_worker_exit(void)
5028 : {
5029 48 : if (am_parallel_apply_worker())
5030 : {
5031 : /*
5032 : * Don't stop the parallel apply worker as the leader will detect the
5033 : * subscription parameter change and restart logical replication later
5034 : * anyway. This also prevents the leader from reporting errors when
5035 : * trying to communicate with a stopped parallel apply worker, which
5036 : * would accidentally disable subscriptions if disable_on_error was
5037 : * set.
5038 : */
5039 0 : return;
5040 : }
5041 :
5042 : /*
5043 : * Reset the last-start time for this apply worker so that the launcher
5044 : * will restart it without waiting for wal_retrieve_retry_interval if the
5045 : * subscription is still active, and so that we won't leak that hash table
5046 : * entry if it isn't.
5047 : */
5048 48 : if (am_leader_apply_worker())
5049 48 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
5050 :
5051 48 : proc_exit(0);
5052 : }
5053 :
5054 : /*
5055 : * Reread subscription info if needed.
5056 : *
5057 : * For significant changes, we react by exiting the current process; a new
5058 : * one will be launched afterwards if needed.
5059 : */
5060 : void
5061 6676 : maybe_reread_subscription(void)
5062 : {
5063 : Subscription *newsub;
5064 6676 : bool started_tx = false;
5065 :
5066 : /* When cache state is valid there is nothing to do here. */
5067 6676 : if (MySubscriptionValid)
5068 6585 : return;
5069 :
5070 : /* This function might be called inside or outside of transaction. */
5071 91 : if (!IsTransactionState())
5072 : {
5073 87 : StartTransactionCommand();
5074 87 : started_tx = true;
5075 : }
5076 :
5077 91 : newsub = GetSubscription(MyLogicalRepWorker->subid, true, true);
5078 :
5079 91 : if (newsub)
5080 : {
5081 91 : MemoryContextSetParent(newsub->cxt, ApplyContext);
5082 : }
5083 : else
5084 : {
5085 : /*
5086 : * Exit if the subscription was removed. This normally should not
5087 : * happen as the worker gets killed during DROP SUBSCRIPTION.
5088 : */
5089 0 : ereport(LOG,
5090 : (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
5091 : MySubscription->name)));
5092 :
5093 : /* Ensure we remove no-longer-useful entry for worker's start time */
5094 0 : if (am_leader_apply_worker())
5095 0 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
5096 :
5097 0 : proc_exit(0);
5098 : }
5099 :
5100 : /* Exit if the subscription was disabled. */
5101 91 : if (!newsub->enabled)
5102 : {
5103 13 : ereport(LOG,
5104 : (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
5105 : MySubscription->name)));
5106 :
5107 13 : apply_worker_exit();
5108 : }
5109 :
5110 : /* !slotname should never happen when enabled is true. */
5111 : Assert(newsub->slotname);
5112 :
5113 : /* two-phase cannot be altered while the worker is running */
5114 : Assert(newsub->twophasestate == MySubscription->twophasestate);
5115 :
5116 : /*
5117 : * Exit if any parameter that affects the remote connection was changed.
5118 : * The launcher will start a new worker but note that the parallel apply
5119 : * worker won't restart if the streaming option's value is changed from
5120 : * 'parallel' to any other value or the server decides not to stream the
5121 : * in-progress transaction.
5122 : */
5123 78 : if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
5124 73 : strcmp(newsub->name, MySubscription->name) != 0 ||
5125 72 : strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
5126 72 : newsub->binary != MySubscription->binary ||
5127 66 : newsub->stream != MySubscription->stream ||
5128 61 : newsub->passwordrequired != MySubscription->passwordrequired ||
5129 61 : strcmp(newsub->origin, MySubscription->origin) != 0 ||
5130 59 : newsub->owner != MySubscription->owner ||
5131 58 : !equal(newsub->publications, MySubscription->publications))
5132 : {
5133 29 : if (am_parallel_apply_worker())
5134 0 : ereport(LOG,
5135 : (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
5136 : MySubscription->name)));
5137 : else
5138 29 : ereport(LOG,
5139 : (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
5140 : MySubscription->name)));
5141 :
5142 29 : apply_worker_exit();
5143 : }
5144 :
5145 : /*
5146 : * Exit if the subscription owner's superuser privileges have been
5147 : * revoked.
5148 : */
5149 49 : if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
5150 : {
5151 4 : if (am_parallel_apply_worker())
5152 0 : ereport(LOG,
5153 : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
5154 : MySubscription->name));
5155 : else
5156 4 : ereport(LOG,
5157 : errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
5158 : MySubscription->name));
5159 :
5160 4 : apply_worker_exit();
5161 : }
5162 :
5163 : /* Check for other changes that should never happen too. */
5164 45 : if (newsub->dbid != MySubscription->dbid)
5165 : {
5166 0 : elog(ERROR, "subscription %u changed unexpectedly",
5167 : MyLogicalRepWorker->subid);
5168 : }
5169 :
5170 : /* Clean old subscription info and switch to new one. */
5171 45 : MemoryContextDelete(MySubscription->cxt);
5172 45 : MySubscription = newsub;
5173 :
5174 : /* Change synchronous commit according to the user's wishes */
5175 45 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
5176 : PGC_BACKEND, PGC_S_OVERRIDE);
5177 :
5178 : /* Change wal_receiver_timeout according to the user's wishes */
5179 45 : set_wal_receiver_timeout();
5180 :
5181 45 : if (started_tx)
5182 43 : CommitTransactionCommand();
5183 :
5184 45 : MySubscriptionValid = true;
5185 : }
5186 :
5187 : /*
5188 : * Change wal_receiver_timeout to MySubscription->walrcvtimeout.
5189 : */
5190 : static void
5191 585 : set_wal_receiver_timeout(void)
5192 : {
5193 : bool parsed;
5194 : int val;
5195 585 : int prev_timeout = wal_receiver_timeout;
5196 :
5197 : /*
5198 : * Set the wal_receiver_timeout GUC to MySubscription->walrcvtimeout,
5199 : * which comes from the subscription's wal_receiver_timeout option. If the
5200 : * value is -1, reset the GUC to its default, meaning it will inherit from
5201 : * the server config, command line, or role/database settings.
5202 : */
5203 585 : parsed = parse_int(MySubscription->walrcvtimeout, &val, 0, NULL);
5204 585 : if (parsed && val == -1)
5205 585 : SetConfigOption("wal_receiver_timeout", NULL,
5206 : PGC_BACKEND, PGC_S_SESSION);
5207 : else
5208 0 : SetConfigOption("wal_receiver_timeout", MySubscription->walrcvtimeout,
5209 : PGC_BACKEND, PGC_S_SESSION);
5210 :
5211 : /*
5212 : * Log the wal_receiver_timeout setting (in milliseconds) as a debug
5213 : * message when it changes, to verify it was set correctly.
5214 : */
5215 585 : if (prev_timeout != wal_receiver_timeout)
5216 0 : elog(DEBUG1, "logical replication worker for subscription \"%s\" wal_receiver_timeout: %d ms",
5217 : MySubscription->name, wal_receiver_timeout);
5218 585 : }
5219 :
5220 : /*
5221 : * Callback from subscription syscache invalidation. Also needed for server or
5222 : * user mapping invalidation, which can change the connection information for
5223 : * subscriptions that connect using a server object.
5224 : */
5225 : static void
5226 95 : subscription_change_cb(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
5227 : {
5228 95 : MySubscriptionValid = false;
5229 95 : }
5230 :
5231 : /*
5232 : * subxact_info_write
5233 : * Store information about subxacts for a toplevel transaction.
5234 : *
5235 : * For each subxact we store offset of its first change in the main file.
5236 : * The file is always over-written as a whole.
5237 : *
5238 : * XXX We should only store subxacts that were not aborted yet.
5239 : */
5240 : static void
5241 371 : subxact_info_write(Oid subid, TransactionId xid)
5242 : {
5243 : char path[MAXPGPATH];
5244 : Size len;
5245 : BufFile *fd;
5246 :
5247 : Assert(TransactionIdIsValid(xid));
5248 :
5249 : /* construct the subxact filename */
5250 371 : subxact_filename(path, subid, xid);
5251 :
5252 : /* Delete the subxacts file, if exists. */
5253 371 : if (subxact_data.nsubxacts == 0)
5254 : {
5255 289 : cleanup_subxact_info();
5256 289 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
5257 :
5258 289 : return;
5259 : }
5260 :
5261 : /*
5262 : * Create the subxact file if it not already created, otherwise open the
5263 : * existing file.
5264 : */
5265 82 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR,
5266 : true);
5267 82 : if (fd == NULL)
5268 8 : fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path);
5269 :
5270 82 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
5271 :
5272 : /* Write the subxact count and subxact info */
5273 82 : BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
5274 82 : BufFileWrite(fd, subxact_data.subxacts, len);
5275 :
5276 82 : BufFileClose(fd);
5277 :
5278 : /* free the memory allocated for subxact info */
5279 82 : cleanup_subxact_info();
5280 : }
5281 :
5282 : /*
5283 : * subxact_info_read
5284 : * Restore information about subxacts of a streamed transaction.
5285 : *
5286 : * Read information about subxacts into the structure subxact_data that can be
5287 : * used later.
5288 : */
5289 : static void
5290 343 : subxact_info_read(Oid subid, TransactionId xid)
5291 : {
5292 : char path[MAXPGPATH];
5293 : Size len;
5294 : BufFile *fd;
5295 : MemoryContext oldctx;
5296 :
5297 : Assert(!subxact_data.subxacts);
5298 : Assert(subxact_data.nsubxacts == 0);
5299 : Assert(subxact_data.nsubxacts_max == 0);
5300 :
5301 : /*
5302 : * If the subxact file doesn't exist that means we don't have any subxact
5303 : * info.
5304 : */
5305 343 : subxact_filename(path, subid, xid);
5306 343 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
5307 : true);
5308 343 : if (fd == NULL)
5309 264 : return;
5310 :
5311 : /* read number of subxact items */
5312 79 : BufFileReadExact(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
5313 :
5314 79 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
5315 :
5316 : /* we keep the maximum as a power of 2 */
5317 79 : subxact_data.nsubxacts_max = 1 << pg_ceil_log2_32(subxact_data.nsubxacts);
5318 :
5319 : /*
5320 : * Allocate subxact information in the logical streaming context. We need
5321 : * this information during the complete stream so that we can add the sub
5322 : * transaction info to this. On stream stop we will flush this information
5323 : * to the subxact file and reset the logical streaming context.
5324 : */
5325 79 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
5326 79 : subxact_data.subxacts = palloc_array(SubXactInfo,
5327 : subxact_data.nsubxacts_max);
5328 79 : MemoryContextSwitchTo(oldctx);
5329 :
5330 79 : if (len > 0)
5331 79 : BufFileReadExact(fd, subxact_data.subxacts, len);
5332 :
5333 79 : BufFileClose(fd);
5334 : }
5335 :
5336 : /*
5337 : * subxact_info_add
5338 : * Add information about a subxact (offset in the main file).
5339 : */
5340 : static void
5341 102513 : subxact_info_add(TransactionId xid)
5342 : {
5343 102513 : SubXactInfo *subxacts = subxact_data.subxacts;
5344 : int64 i;
5345 :
5346 : /* We must have a valid top level stream xid and a stream fd. */
5347 : Assert(TransactionIdIsValid(stream_xid));
5348 : Assert(stream_fd != NULL);
5349 :
5350 : /*
5351 : * If the XID matches the toplevel transaction, we don't want to add it.
5352 : */
5353 102513 : if (stream_xid == xid)
5354 92389 : return;
5355 :
5356 : /*
5357 : * In most cases we're checking the same subxact as we've already seen in
5358 : * the last call, so make sure to ignore it (this change comes later).
5359 : */
5360 10124 : if (subxact_data.subxact_last == xid)
5361 10048 : return;
5362 :
5363 : /* OK, remember we're processing this XID. */
5364 76 : subxact_data.subxact_last = xid;
5365 :
5366 : /*
5367 : * Check if the transaction is already present in the array of subxact. We
5368 : * intentionally scan the array from the tail, because we're likely adding
5369 : * a change for the most recent subtransactions.
5370 : *
5371 : * XXX Can we rely on the subxact XIDs arriving in sorted order? That
5372 : * would allow us to use binary search here.
5373 : */
5374 95 : for (i = subxact_data.nsubxacts; i > 0; i--)
5375 : {
5376 : /* found, so we're done */
5377 76 : if (subxacts[i - 1].xid == xid)
5378 57 : return;
5379 : }
5380 :
5381 : /* This is a new subxact, so we need to add it to the array. */
5382 19 : if (subxact_data.nsubxacts == 0)
5383 : {
5384 : MemoryContext oldctx;
5385 :
5386 8 : subxact_data.nsubxacts_max = 128;
5387 :
5388 : /*
5389 : * Allocate this memory for subxacts in per-stream context, see
5390 : * subxact_info_read.
5391 : */
5392 8 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
5393 8 : subxacts = palloc_array(SubXactInfo, subxact_data.nsubxacts_max);
5394 8 : MemoryContextSwitchTo(oldctx);
5395 : }
5396 11 : else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
5397 : {
5398 10 : subxact_data.nsubxacts_max *= 2;
5399 10 : subxacts = repalloc_array(subxacts, SubXactInfo,
5400 : subxact_data.nsubxacts_max);
5401 : }
5402 :
5403 19 : subxacts[subxact_data.nsubxacts].xid = xid;
5404 :
5405 : /*
5406 : * Get the current offset of the stream file and store it as offset of
5407 : * this subxact.
5408 : */
5409 19 : BufFileTell(stream_fd,
5410 19 : &subxacts[subxact_data.nsubxacts].fileno,
5411 19 : &subxacts[subxact_data.nsubxacts].offset);
5412 :
5413 19 : subxact_data.nsubxacts++;
5414 19 : subxact_data.subxacts = subxacts;
5415 : }
5416 :
5417 : /* format filename for file containing the info about subxacts */
5418 : static inline void
5419 745 : subxact_filename(char *path, Oid subid, TransactionId xid)
5420 : {
5421 745 : snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
5422 745 : }
5423 :
5424 : /* format filename for file containing serialized changes */
5425 : static inline void
5426 437 : changes_filename(char *path, Oid subid, TransactionId xid)
5427 : {
5428 437 : snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
5429 437 : }
5430 :
5431 : /*
5432 : * stream_cleanup_files
5433 : * Cleanup files for a subscription / toplevel transaction.
5434 : *
5435 : * Remove files with serialized changes and subxact info for a particular
5436 : * toplevel transaction. Each subscription has a separate set of files
5437 : * for any toplevel transaction.
5438 : */
5439 : void
5440 31 : stream_cleanup_files(Oid subid, TransactionId xid)
5441 : {
5442 : char path[MAXPGPATH];
5443 :
5444 : /* Delete the changes file. */
5445 31 : changes_filename(path, subid, xid);
5446 31 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false);
5447 :
5448 : /* Delete the subxact file, if it exists. */
5449 31 : subxact_filename(path, subid, xid);
5450 31 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
5451 31 : }
5452 :
5453 : /*
5454 : * stream_open_file
5455 : * Open a file that we'll use to serialize changes for a toplevel
5456 : * transaction.
5457 : *
5458 : * Open a file for streamed changes from a toplevel transaction identified
5459 : * by stream_xid (global variable). If it's the first chunk of streamed
5460 : * changes for this transaction, create the buffile, otherwise open the
5461 : * previously created file.
5462 : */
5463 : static void
5464 362 : stream_open_file(Oid subid, TransactionId xid, bool first_segment)
5465 : {
5466 : char path[MAXPGPATH];
5467 : MemoryContext oldcxt;
5468 :
5469 : Assert(OidIsValid(subid));
5470 : Assert(TransactionIdIsValid(xid));
5471 : Assert(stream_fd == NULL);
5472 :
5473 :
5474 362 : changes_filename(path, subid, xid);
5475 362 : elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
5476 :
5477 : /*
5478 : * Create/open the buffiles under the logical streaming context so that we
5479 : * have those files until stream stop.
5480 : */
5481 362 : oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
5482 :
5483 : /*
5484 : * If this is the first streamed segment, create the changes file.
5485 : * Otherwise, just open the file for writing, in append mode.
5486 : */
5487 362 : if (first_segment)
5488 32 : stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset,
5489 : path);
5490 : else
5491 : {
5492 : /*
5493 : * Open the file and seek to the end of the file because we always
5494 : * append the changes file.
5495 : */
5496 330 : stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset,
5497 : path, O_RDWR, false);
5498 330 : BufFileSeek(stream_fd, 0, 0, SEEK_END);
5499 : }
5500 :
5501 362 : MemoryContextSwitchTo(oldcxt);
5502 362 : }
5503 :
5504 : /*
5505 : * stream_close_file
5506 : * Close the currently open file with streamed changes.
5507 : */
5508 : static void
5509 392 : stream_close_file(void)
5510 : {
5511 : Assert(stream_fd != NULL);
5512 :
5513 392 : BufFileClose(stream_fd);
5514 :
5515 392 : stream_fd = NULL;
5516 392 : }
5517 :
5518 : /*
5519 : * stream_write_change
5520 : * Serialize a change to a file for the current toplevel transaction.
5521 : *
5522 : * The change is serialized in a simple format, with length (not including
5523 : * the length), action code (identifying the message type) and message
5524 : * contents (without the subxact TransactionId value).
5525 : */
5526 : static void
5527 107554 : stream_write_change(char action, StringInfo s)
5528 : {
5529 : int len;
5530 :
5531 : Assert(stream_fd != NULL);
5532 :
5533 : /* total on-disk size, including the action type character */
5534 107554 : len = (s->len - s->cursor) + sizeof(char);
5535 :
5536 : /* first write the size */
5537 107554 : BufFileWrite(stream_fd, &len, sizeof(len));
5538 :
5539 : /* then the action */
5540 107554 : BufFileWrite(stream_fd, &action, sizeof(action));
5541 :
5542 : /* and finally the remaining part of the buffer (after the XID) */
5543 107554 : len = (s->len - s->cursor);
5544 :
5545 107554 : BufFileWrite(stream_fd, &s->data[s->cursor], len);
5546 107554 : }
5547 :
5548 : /*
5549 : * stream_open_and_write_change
5550 : * Serialize a message to a file for the given transaction.
5551 : *
5552 : * This function is similar to stream_write_change except that it will open the
5553 : * target file if not already before writing the message and close the file at
5554 : * the end.
5555 : */
5556 : static void
5557 5 : stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
5558 : {
5559 : Assert(!in_streamed_transaction);
5560 :
5561 5 : if (!stream_fd)
5562 5 : stream_start_internal(xid, false);
5563 :
5564 5 : stream_write_change(action, s);
5565 5 : stream_stop_internal(xid);
5566 5 : }
5567 :
5568 : /*
5569 : * Sets streaming options including replication slot name and origin start
5570 : * position. Workers need these options for logical replication.
5571 : */
5572 : void
5573 456 : set_stream_options(WalRcvStreamOptions *options,
5574 : char *slotname,
5575 : XLogRecPtr *origin_startpos)
5576 : {
5577 : int server_version;
5578 :
5579 456 : options->logical = true;
5580 456 : options->startpoint = *origin_startpos;
5581 456 : options->slotname = slotname;
5582 :
5583 456 : server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
5584 456 : options->proto.logical.proto_version =
5585 456 : server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
5586 : server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
5587 : server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
5588 : LOGICALREP_PROTO_VERSION_NUM;
5589 :
5590 456 : options->proto.logical.publication_names = MySubscription->publications;
5591 456 : options->proto.logical.binary = MySubscription->binary;
5592 :
5593 : /*
5594 : * Assign the appropriate option value for streaming option according to
5595 : * the 'streaming' mode and the publisher's ability to support that mode.
5596 : */
5597 456 : if (server_version >= 160000 &&
5598 456 : MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
5599 : {
5600 422 : options->proto.logical.streaming_str = "parallel";
5601 422 : MyLogicalRepWorker->parallel_apply = true;
5602 : }
5603 34 : else if (server_version >= 140000 &&
5604 34 : MySubscription->stream != LOGICALREP_STREAM_OFF)
5605 : {
5606 26 : options->proto.logical.streaming_str = "on";
5607 26 : MyLogicalRepWorker->parallel_apply = false;
5608 : }
5609 : else
5610 : {
5611 8 : options->proto.logical.streaming_str = NULL;
5612 8 : MyLogicalRepWorker->parallel_apply = false;
5613 : }
5614 :
5615 456 : options->proto.logical.twophase = false;
5616 456 : options->proto.logical.origin = pstrdup(MySubscription->origin);
5617 456 : }
5618 :
5619 : /*
5620 : * Cleanup the memory for subxacts and reset the related variables.
5621 : */
5622 : static inline void
5623 375 : cleanup_subxact_info(void)
5624 : {
5625 375 : if (subxact_data.subxacts)
5626 87 : pfree(subxact_data.subxacts);
5627 :
5628 375 : subxact_data.subxacts = NULL;
5629 375 : subxact_data.subxact_last = InvalidTransactionId;
5630 375 : subxact_data.nsubxacts = 0;
5631 375 : subxact_data.nsubxacts_max = 0;
5632 375 : }
5633 :
5634 : /*
5635 : * Common function to run the apply loop with error handling. Disable the
5636 : * subscription, if necessary.
5637 : *
5638 : * Note that we don't handle FATAL errors which are probably because
5639 : * of system resource error and are not repeatable.
5640 : */
5641 : void
5642 456 : start_apply(XLogRecPtr origin_startpos)
5643 : {
5644 456 : PG_TRY();
5645 : {
5646 456 : LogicalRepApplyLoop(origin_startpos);
5647 : }
5648 103 : PG_CATCH();
5649 : {
5650 : /*
5651 : * Reset the origin state to prevent the advancement of origin
5652 : * progress if we fail to apply. Otherwise, this will result in
5653 : * transaction loss as that transaction won't be sent again by the
5654 : * server.
5655 : */
5656 103 : replorigin_xact_clear(true);
5657 :
5658 103 : if (MySubscription->disableonerr)
5659 3 : DisableSubscriptionAndExit();
5660 : else
5661 : {
5662 : /*
5663 : * Report the worker failed while applying changes. Abort the
5664 : * current transaction so that the stats message is sent in an
5665 : * idle state.
5666 : */
5667 100 : AbortOutOfAnyTransaction();
5668 100 : pgstat_report_subscription_error(MySubscription->oid);
5669 :
5670 100 : PG_RE_THROW();
5671 : }
5672 : }
5673 0 : PG_END_TRY();
5674 0 : }
5675 :
5676 : /*
5677 : * Runs the leader apply worker.
5678 : *
5679 : * It sets up replication origin, streaming options and then starts streaming.
5680 : */
5681 : static void
5682 304 : run_apply_worker(void)
5683 : {
5684 : char originname[NAMEDATALEN];
5685 304 : XLogRecPtr origin_startpos = InvalidXLogRecPtr;
5686 304 : char *slotname = NULL;
5687 : WalRcvStreamOptions options;
5688 : ReplOriginId originid;
5689 : TimeLineID startpointTLI;
5690 : char *err;
5691 : bool must_use_password;
5692 :
5693 304 : slotname = MySubscription->slotname;
5694 :
5695 : /*
5696 : * This shouldn't happen if the subscription is enabled, but guard against
5697 : * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
5698 : * slot is NULL.)
5699 : */
5700 304 : if (!slotname)
5701 0 : ereport(ERROR,
5702 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5703 : errmsg("subscription has no replication slot set")));
5704 :
5705 : /* Setup replication origin tracking. */
5706 304 : ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
5707 : originname, sizeof(originname));
5708 304 : StartTransactionCommand();
5709 304 : originid = replorigin_by_name(originname, true);
5710 304 : if (!OidIsValid(originid))
5711 0 : originid = replorigin_create(originname);
5712 304 : replorigin_session_setup(originid, 0);
5713 303 : replorigin_xact_state.origin = originid;
5714 303 : origin_startpos = replorigin_session_get_progress(false);
5715 303 : CommitTransactionCommand();
5716 :
5717 : /* Is the use of a password mandatory? */
5718 579 : must_use_password = MySubscription->passwordrequired &&
5719 276 : !MySubscription->ownersuperuser;
5720 :
5721 303 : LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
5722 : true, must_use_password,
5723 : MySubscription->name, &err);
5724 :
5725 293 : if (LogRepWorkerWalRcvConn == NULL)
5726 35 : ereport(ERROR,
5727 : (errcode(ERRCODE_CONNECTION_FAILURE),
5728 : errmsg("apply worker for subscription \"%s\" could not connect to the publisher: %s",
5729 : MySubscription->name, err)));
5730 :
5731 : /*
5732 : * We don't really use the output identify_system for anything but it does
5733 : * some initializations on the upstream so let's still call it.
5734 : */
5735 258 : (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
5736 :
5737 258 : set_apply_error_context_origin(originname);
5738 :
5739 258 : set_stream_options(&options, slotname, &origin_startpos);
5740 :
5741 : /*
5742 : * Even when the two_phase mode is requested by the user, it remains as
5743 : * the tri-state PENDING until all tablesyncs have reached READY state.
5744 : * Only then, can it become ENABLED.
5745 : *
5746 : * Note: If the subscription has no tables then leave the state as
5747 : * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
5748 : * work.
5749 : */
5750 274 : if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
5751 16 : AllTablesyncsReady())
5752 : {
5753 : /* Start streaming with two_phase enabled */
5754 9 : options.proto.logical.twophase = true;
5755 9 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
5756 :
5757 9 : StartTransactionCommand();
5758 :
5759 : /*
5760 : * Updating pg_subscription might involve TOAST table access, so
5761 : * ensure we have a valid snapshot.
5762 : */
5763 9 : PushActiveSnapshot(GetTransactionSnapshot());
5764 :
5765 9 : UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
5766 9 : MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
5767 9 : PopActiveSnapshot();
5768 9 : CommitTransactionCommand();
5769 : }
5770 : else
5771 : {
5772 249 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
5773 : }
5774 :
5775 258 : ereport(DEBUG1,
5776 : (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
5777 : MySubscription->name,
5778 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
5779 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
5780 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
5781 : "?")));
5782 :
5783 : /* Run the main loop. */
5784 258 : start_apply(origin_startpos);
5785 0 : }
5786 :
5787 : /*
5788 : * Common initialization for leader apply worker, parallel apply worker,
5789 : * tablesync worker and sequencesync worker.
5790 : *
5791 : * Initialize the database connection, in-memory subscription and necessary
5792 : * config options.
5793 : */
5794 : void
5795 611 : InitializeLogRepWorker(void)
5796 : {
5797 : /* Run as replica session replication role. */
5798 611 : SetConfigOption("session_replication_role", "replica",
5799 : PGC_SUSET, PGC_S_OVERRIDE);
5800 :
5801 : /* Connect to our database. */
5802 611 : BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
5803 611 : MyLogicalRepWorker->userid,
5804 : 0);
5805 :
5806 : /*
5807 : * Set always-secure search path, so malicious users can't redirect user
5808 : * code (e.g. pg_index.indexprs).
5809 : */
5810 609 : SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
5811 :
5812 609 : ApplyContext = AllocSetContextCreate(TopMemoryContext,
5813 : "ApplyContext",
5814 : ALLOCSET_DEFAULT_SIZES);
5815 :
5816 609 : StartTransactionCommand();
5817 :
5818 : /*
5819 : * Lock the subscription to prevent it from being concurrently dropped,
5820 : * then re-verify its existence. After the initialization, the worker will
5821 : * be terminated gracefully if the subscription is dropped.
5822 : */
5823 609 : LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, 0,
5824 : AccessShareLock);
5825 :
5826 608 : MySubscription = GetSubscription(MyLogicalRepWorker->subid, true, true);
5827 :
5828 608 : if (MySubscription)
5829 : {
5830 541 : MemoryContextSetParent(MySubscription->cxt, ApplyContext);
5831 : }
5832 : else
5833 : {
5834 67 : ereport(LOG,
5835 : (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
5836 : MyLogicalRepWorker->subid)));
5837 :
5838 : /* Ensure we remove no-longer-useful entry for worker's start time */
5839 67 : if (am_leader_apply_worker())
5840 67 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
5841 :
5842 67 : proc_exit(0);
5843 : }
5844 :
5845 541 : MySubscriptionValid = true;
5846 :
5847 541 : if (!MySubscription->enabled)
5848 : {
5849 1 : ereport(LOG,
5850 : (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
5851 : MySubscription->name)));
5852 :
5853 1 : apply_worker_exit();
5854 : }
5855 :
5856 : /*
5857 : * Restart the worker if retain_dead_tuples was enabled during startup.
5858 : *
5859 : * At this point, the replication slot used for conflict detection might
5860 : * not exist yet, or could be dropped soon if the launcher perceives
5861 : * retain_dead_tuples as disabled. To avoid unnecessary tracking of
5862 : * oldest_nonremovable_xid when the slot is absent or at risk of being
5863 : * dropped, a restart is initiated.
5864 : *
5865 : * The oldest_nonremovable_xid should be initialized only when the
5866 : * subscription's retention is active before launching the worker. See
5867 : * logicalrep_worker_launch.
5868 : */
5869 540 : if (am_leader_apply_worker() &&
5870 304 : MySubscription->retaindeadtuples &&
5871 14 : MySubscription->retentionactive &&
5872 14 : !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid))
5873 : {
5874 0 : ereport(LOG,
5875 : errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup",
5876 : MySubscription->name, "retain_dead_tuples"));
5877 :
5878 0 : apply_worker_exit();
5879 : }
5880 :
5881 : /* Setup synchronous commit according to the user's wishes */
5882 540 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
5883 : PGC_BACKEND, PGC_S_OVERRIDE);
5884 :
5885 : /* Change wal_receiver_timeout according to the user's wishes */
5886 540 : set_wal_receiver_timeout();
5887 :
5888 : /*
5889 : * Keep us informed about subscription or role changes. Note that the
5890 : * role's superuser privilege can be revoked.
5891 : */
5892 540 : CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
5893 : subscription_change_cb,
5894 : (Datum) 0);
5895 : /* Changes to foreign servers may affect subscriptions using SERVER. */
5896 540 : CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
5897 : subscription_change_cb,
5898 : (Datum) 0);
5899 : /* Changes to user mappings may affect subscriptions using SERVER. */
5900 540 : CacheRegisterSyscacheCallback(USERMAPPINGOID,
5901 : subscription_change_cb,
5902 : (Datum) 0);
5903 :
5904 : /*
5905 : * Changes to FDW connection_function may affect subscriptions using
5906 : * SERVER.
5907 : */
5908 540 : CacheRegisterSyscacheCallback(FOREIGNDATAWRAPPEROID,
5909 : subscription_change_cb,
5910 : (Datum) 0);
5911 :
5912 540 : CacheRegisterSyscacheCallback(AUTHOID,
5913 : subscription_change_cb,
5914 : (Datum) 0);
5915 :
5916 540 : if (am_tablesync_worker())
5917 213 : ereport(LOG,
5918 : errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
5919 : MySubscription->name,
5920 : get_rel_name(MyLogicalRepWorker->relid)));
5921 327 : else if (am_sequencesync_worker())
5922 11 : ereport(LOG,
5923 : errmsg("logical replication sequence synchronization worker for subscription \"%s\" has started",
5924 : MySubscription->name));
5925 : else
5926 316 : ereport(LOG,
5927 : errmsg("logical replication apply worker for subscription \"%s\" has started",
5928 : MySubscription->name));
5929 :
5930 540 : CommitTransactionCommand();
5931 :
5932 : /*
5933 : * Register a callback to reset the origin state before aborting any
5934 : * pending transaction during shutdown (see ShutdownPostgres()). This will
5935 : * avoid origin advancement for an incomplete transaction which could
5936 : * otherwise lead to its loss as such a transaction won't be sent by the
5937 : * server again.
5938 : *
5939 : * Note that even a LOG or DEBUG statement placed after setting the origin
5940 : * state may process a shutdown signal before committing the current apply
5941 : * operation. So, it is important to register such a callback here.
5942 : *
5943 : * Register this callback here to ensure that all types of logical
5944 : * replication workers that set up origins and apply remote transactions
5945 : * are protected.
5946 : */
5947 540 : before_shmem_exit(on_exit_clear_xact_state, (Datum) 0);
5948 540 : }
5949 :
5950 : /*
5951 : * Callback on exit to clear transaction-level replication origin state.
5952 : */
5953 : static void
5954 540 : on_exit_clear_xact_state(int code, Datum arg)
5955 : {
5956 540 : replorigin_xact_clear(true);
5957 540 : }
5958 :
5959 : /*
5960 : * Common function to setup the leader apply, tablesync and sequencesync worker.
5961 : */
5962 : void
5963 599 : SetupApplyOrSyncWorker(int worker_slot)
5964 : {
5965 : /* Attach to slot */
5966 599 : logicalrep_worker_attach(worker_slot);
5967 :
5968 : Assert(am_tablesync_worker() || am_sequencesync_worker() || am_leader_apply_worker());
5969 :
5970 : /* Setup signal handling */
5971 599 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
5972 599 : BackgroundWorkerUnblockSignals();
5973 :
5974 : /*
5975 : * We don't currently need any ResourceOwner in a walreceiver process, but
5976 : * if we did, we could call CreateAuxProcessResourceOwner here.
5977 : */
5978 :
5979 : /* Initialise stats to a sanish value */
5980 599 : MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
5981 599 : MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
5982 :
5983 : /* Load the libpq-specific functions */
5984 599 : load_file("libpqwalreceiver", false);
5985 :
5986 599 : InitializeLogRepWorker();
5987 :
5988 : /* Connect to the origin and start the replication. */
5989 528 : elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
5990 : MySubscription->conninfo);
5991 :
5992 : /*
5993 : * Setup callback for syscache so that we know when something changes in
5994 : * the subscription relation state.
5995 : */
5996 528 : CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
5997 : InvalidateSyncingRelStates,
5998 : (Datum) 0);
5999 528 : }
6000 :
6001 : /* Logical Replication Apply worker entry point */
6002 : void
6003 375 : ApplyWorkerMain(Datum main_arg)
6004 : {
6005 375 : int worker_slot = DatumGetInt32(main_arg);
6006 :
6007 375 : InitializingApplyWorker = true;
6008 :
6009 375 : SetupApplyOrSyncWorker(worker_slot);
6010 :
6011 304 : InitializingApplyWorker = false;
6012 :
6013 304 : run_apply_worker();
6014 :
6015 0 : proc_exit(0);
6016 : }
6017 :
6018 : /*
6019 : * After error recovery, disable the subscription in a new transaction
6020 : * and exit cleanly.
6021 : */
6022 : void
6023 4 : DisableSubscriptionAndExit(void)
6024 : {
6025 : /*
6026 : * Emit the error message, and recover from the error state to an idle
6027 : * state
6028 : */
6029 4 : HOLD_INTERRUPTS();
6030 :
6031 4 : EmitErrorReport();
6032 4 : AbortOutOfAnyTransaction();
6033 4 : FlushErrorState();
6034 :
6035 4 : RESUME_INTERRUPTS();
6036 :
6037 : /*
6038 : * Report the worker failed during sequence synchronization, table
6039 : * synchronization, or apply.
6040 : */
6041 4 : pgstat_report_subscription_error(MyLogicalRepWorker->subid);
6042 :
6043 : /* Disable the subscription */
6044 4 : StartTransactionCommand();
6045 :
6046 : /*
6047 : * Updating pg_subscription might involve TOAST table access, so ensure we
6048 : * have a valid snapshot.
6049 : */
6050 4 : PushActiveSnapshot(GetTransactionSnapshot());
6051 :
6052 4 : DisableSubscription(MySubscription->oid);
6053 4 : PopActiveSnapshot();
6054 4 : CommitTransactionCommand();
6055 :
6056 : /* Ensure we remove no-longer-useful entry for worker's start time */
6057 4 : if (am_leader_apply_worker())
6058 3 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
6059 :
6060 : /* Notify the subscription has been disabled and exit */
6061 4 : ereport(LOG,
6062 : errmsg("subscription \"%s\" has been disabled because of an error",
6063 : MySubscription->name));
6064 :
6065 : /*
6066 : * Skip the track_commit_timestamp check when disabling the worker due to
6067 : * an error, as verifying commit timestamps is unnecessary in this
6068 : * context.
6069 : */
6070 4 : CheckSubDeadTupleRetention(false, true, WARNING,
6071 4 : MySubscription->retaindeadtuples,
6072 4 : MySubscription->retentionactive, false);
6073 :
6074 4 : proc_exit(0);
6075 : }
6076 :
6077 : /*
6078 : * Is current process a logical replication worker?
6079 : */
6080 : bool
6081 2785 : IsLogicalWorker(void)
6082 : {
6083 2785 : return MyLogicalRepWorker != NULL;
6084 : }
6085 :
6086 : /*
6087 : * Is current process a logical replication parallel apply worker?
6088 : */
6089 : bool
6090 2028 : IsLogicalParallelApplyWorker(void)
6091 : {
6092 2028 : return IsLogicalWorker() && am_parallel_apply_worker();
6093 : }
6094 :
6095 : /*
6096 : * Start skipping changes of the transaction if the given LSN matches the
6097 : * LSN specified by subscription's skiplsn.
6098 : */
6099 : static void
6100 578 : maybe_start_skipping_changes(XLogRecPtr finish_lsn)
6101 : {
6102 : Assert(!is_skipping_changes());
6103 : Assert(!in_remote_transaction);
6104 : Assert(!in_streamed_transaction);
6105 :
6106 : /*
6107 : * Quick return if it's not requested to skip this transaction. This
6108 : * function is called for every remote transaction and we assume that
6109 : * skipping the transaction is not used often.
6110 : */
6111 578 : if (likely(!XLogRecPtrIsValid(MySubscription->skiplsn) ||
6112 : MySubscription->skiplsn != finish_lsn))
6113 575 : return;
6114 :
6115 : /* Start skipping all changes of this transaction */
6116 3 : skip_xact_finish_lsn = finish_lsn;
6117 :
6118 3 : ereport(LOG,
6119 : errmsg("logical replication starts skipping transaction at LSN %X/%08X",
6120 : LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
6121 : }
6122 :
6123 : /*
6124 : * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
6125 : */
6126 : static void
6127 30 : stop_skipping_changes(void)
6128 : {
6129 30 : if (!is_skipping_changes())
6130 27 : return;
6131 :
6132 3 : ereport(LOG,
6133 : errmsg("logical replication completed skipping transaction at LSN %X/%08X",
6134 : LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
6135 :
6136 : /* Stop skipping changes */
6137 3 : skip_xact_finish_lsn = InvalidXLogRecPtr;
6138 : }
6139 :
6140 : /*
6141 : * Clear subskiplsn of pg_subscription catalog.
6142 : *
6143 : * finish_lsn is the transaction's finish LSN that is used to check if the
6144 : * subskiplsn matches it. If not matched, we raise a warning when clearing the
6145 : * subskiplsn in order to inform users for cases e.g., where the user mistakenly
6146 : * specified the wrong subskiplsn.
6147 : */
6148 : static void
6149 562 : clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
6150 : {
6151 : Relation rel;
6152 : Form_pg_subscription subform;
6153 : HeapTuple tup;
6154 562 : XLogRecPtr myskiplsn = MySubscription->skiplsn;
6155 562 : bool started_tx = false;
6156 :
6157 562 : if (likely(!XLogRecPtrIsValid(myskiplsn)) || am_parallel_apply_worker())
6158 559 : return;
6159 :
6160 3 : if (!IsTransactionState())
6161 : {
6162 1 : StartTransactionCommand();
6163 1 : started_tx = true;
6164 : }
6165 :
6166 : /*
6167 : * Updating pg_subscription might involve TOAST table access, so ensure we
6168 : * have a valid snapshot.
6169 : */
6170 3 : PushActiveSnapshot(GetTransactionSnapshot());
6171 :
6172 : /*
6173 : * Protect subskiplsn of pg_subscription from being concurrently updated
6174 : * while clearing it.
6175 : */
6176 3 : LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
6177 : AccessShareLock);
6178 :
6179 3 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
6180 :
6181 : /* Fetch the existing tuple. */
6182 3 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
6183 : ObjectIdGetDatum(MySubscription->oid));
6184 :
6185 3 : if (!HeapTupleIsValid(tup))
6186 0 : elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
6187 :
6188 3 : subform = (Form_pg_subscription) GETSTRUCT(tup);
6189 :
6190 : /*
6191 : * Clear the subskiplsn. If the user has already changed subskiplsn before
6192 : * clearing it we don't update the catalog and the replication origin
6193 : * state won't get advanced. So in the worst case, if the server crashes
6194 : * before sending an acknowledgment of the flush position the transaction
6195 : * will be sent again and the user needs to set subskiplsn again. We can
6196 : * reduce the possibility by logging a replication origin WAL record to
6197 : * advance the origin LSN instead but there is no way to advance the
6198 : * origin timestamp and it doesn't seem to be worth doing anything about
6199 : * it since it's a very rare case.
6200 : */
6201 3 : if (subform->subskiplsn == myskiplsn)
6202 : {
6203 : bool nulls[Natts_pg_subscription];
6204 : bool replaces[Natts_pg_subscription];
6205 : Datum values[Natts_pg_subscription];
6206 :
6207 3 : memset(values, 0, sizeof(values));
6208 3 : memset(nulls, false, sizeof(nulls));
6209 3 : memset(replaces, false, sizeof(replaces));
6210 :
6211 : /* reset subskiplsn */
6212 3 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
6213 3 : replaces[Anum_pg_subscription_subskiplsn - 1] = true;
6214 :
6215 3 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
6216 : replaces);
6217 3 : CatalogTupleUpdate(rel, &tup->t_self, tup);
6218 :
6219 3 : if (myskiplsn != finish_lsn)
6220 0 : ereport(WARNING,
6221 : errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
6222 : errdetail("Remote transaction's finish WAL location (LSN) %X/%08X did not match skip-LSN %X/%08X.",
6223 : LSN_FORMAT_ARGS(finish_lsn),
6224 : LSN_FORMAT_ARGS(myskiplsn)));
6225 : }
6226 :
6227 3 : heap_freetuple(tup);
6228 3 : table_close(rel, NoLock);
6229 :
6230 3 : PopActiveSnapshot();
6231 :
6232 3 : if (started_tx)
6233 1 : CommitTransactionCommand();
6234 : }
6235 :
6236 : /* Error callback to give more context info about the change being applied */
6237 : void
6238 4616 : apply_error_callback(void *arg)
6239 : {
6240 4616 : ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
6241 :
6242 4616 : if (apply_error_callback_arg.command == 0)
6243 4187 : return;
6244 :
6245 : Assert(errarg->origin_name);
6246 :
6247 429 : if (errarg->rel == NULL)
6248 : {
6249 341 : if (!TransactionIdIsValid(errarg->remote_xid))
6250 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
6251 : errarg->origin_name,
6252 : logicalrep_message_type(errarg->command));
6253 341 : else if (!XLogRecPtrIsValid(errarg->finish_lsn))
6254 263 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
6255 : errarg->origin_name,
6256 : logicalrep_message_type(errarg->command),
6257 : errarg->remote_xid);
6258 : else
6259 156 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%08X",
6260 : errarg->origin_name,
6261 : logicalrep_message_type(errarg->command),
6262 : errarg->remote_xid,
6263 78 : LSN_FORMAT_ARGS(errarg->finish_lsn));
6264 : }
6265 : else
6266 : {
6267 88 : if (errarg->remote_attnum < 0)
6268 : {
6269 88 : if (!XLogRecPtrIsValid(errarg->finish_lsn))
6270 4 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
6271 : errarg->origin_name,
6272 : logicalrep_message_type(errarg->command),
6273 2 : errarg->rel->remoterel.nspname,
6274 2 : errarg->rel->remoterel.relname,
6275 : errarg->remote_xid);
6276 : else
6277 172 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%08X",
6278 : errarg->origin_name,
6279 : logicalrep_message_type(errarg->command),
6280 86 : errarg->rel->remoterel.nspname,
6281 86 : errarg->rel->remoterel.relname,
6282 : errarg->remote_xid,
6283 86 : LSN_FORMAT_ARGS(errarg->finish_lsn));
6284 : }
6285 : else
6286 : {
6287 0 : if (!XLogRecPtrIsValid(errarg->finish_lsn))
6288 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
6289 : errarg->origin_name,
6290 : logicalrep_message_type(errarg->command),
6291 0 : errarg->rel->remoterel.nspname,
6292 0 : errarg->rel->remoterel.relname,
6293 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
6294 : errarg->remote_xid);
6295 : else
6296 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%08X",
6297 : errarg->origin_name,
6298 : logicalrep_message_type(errarg->command),
6299 0 : errarg->rel->remoterel.nspname,
6300 0 : errarg->rel->remoterel.relname,
6301 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
6302 : errarg->remote_xid,
6303 0 : LSN_FORMAT_ARGS(errarg->finish_lsn));
6304 : }
6305 : }
6306 : }
6307 :
6308 : /* Set transaction information of apply error callback */
6309 : static inline void
6310 2991 : set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
6311 : {
6312 2991 : apply_error_callback_arg.remote_xid = xid;
6313 2991 : apply_error_callback_arg.finish_lsn = lsn;
6314 2991 : }
6315 :
6316 : /* Reset all information of apply error callback */
6317 : static inline void
6318 1457 : reset_apply_error_context_info(void)
6319 : {
6320 1457 : apply_error_callback_arg.command = 0;
6321 1457 : apply_error_callback_arg.rel = NULL;
6322 1457 : apply_error_callback_arg.remote_attnum = -1;
6323 1457 : set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
6324 1457 : }
6325 :
6326 : /*
6327 : * Request wakeup of the workers for the given subscription OID
6328 : * at commit of the current transaction.
6329 : *
6330 : * This is used to ensure that the workers process assorted changes
6331 : * as soon as possible.
6332 : */
6333 : void
6334 280 : LogicalRepWorkersWakeupAtCommit(Oid subid)
6335 : {
6336 : MemoryContext oldcxt;
6337 :
6338 280 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
6339 280 : on_commit_wakeup_workers_subids =
6340 280 : list_append_unique_oid(on_commit_wakeup_workers_subids, subid);
6341 280 : MemoryContextSwitchTo(oldcxt);
6342 280 : }
6343 :
6344 : /*
6345 : * Wake up the workers of any subscriptions that were changed in this xact.
6346 : */
6347 : void
6348 651530 : AtEOXact_LogicalRepWorkers(bool isCommit)
6349 : {
6350 651530 : if (isCommit && on_commit_wakeup_workers_subids != NIL)
6351 : {
6352 : ListCell *lc;
6353 :
6354 274 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
6355 548 : foreach(lc, on_commit_wakeup_workers_subids)
6356 : {
6357 274 : Oid subid = lfirst_oid(lc);
6358 : List *workers;
6359 : ListCell *lc2;
6360 :
6361 274 : workers = logicalrep_workers_find(subid, true, false);
6362 345 : foreach(lc2, workers)
6363 : {
6364 71 : LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
6365 :
6366 71 : logicalrep_worker_wakeup_ptr(worker);
6367 : }
6368 : }
6369 274 : LWLockRelease(LogicalRepWorkerLock);
6370 : }
6371 :
6372 : /* The List storage will be reclaimed automatically in xact cleanup. */
6373 651530 : on_commit_wakeup_workers_subids = NIL;
6374 651530 : }
6375 :
6376 : /*
6377 : * Allocate the origin name in long-lived context for error context message.
6378 : */
6379 : void
6380 468 : set_apply_error_context_origin(char *originname)
6381 : {
6382 468 : apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
6383 : originname);
6384 468 : }
6385 :
6386 : /*
6387 : * Return the action to be taken for the given transaction. See
6388 : * TransApplyAction for information on each of the actions.
6389 : *
6390 : * *winfo is assigned to the destination parallel worker info when the leader
6391 : * apply worker has to pass all the transaction's changes to the parallel
6392 : * apply worker.
6393 : */
6394 : static TransApplyAction
6395 350016 : get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
6396 : {
6397 350016 : *winfo = NULL;
6398 :
6399 350016 : if (am_parallel_apply_worker())
6400 : {
6401 68922 : return TRANS_PARALLEL_APPLY;
6402 : }
6403 :
6404 : /*
6405 : * If we are processing this transaction using a parallel apply worker
6406 : * then either we send the changes to the parallel worker or if the worker
6407 : * is busy then serialize the changes to the file which will later be
6408 : * processed by the parallel worker.
6409 : */
6410 281094 : *winfo = pa_find_worker(xid);
6411 :
6412 281094 : if (*winfo && (*winfo)->serialize_changes)
6413 : {
6414 5037 : return TRANS_LEADER_PARTIAL_SERIALIZE;
6415 : }
6416 276057 : else if (*winfo)
6417 : {
6418 68906 : return TRANS_LEADER_SEND_TO_PARALLEL;
6419 : }
6420 :
6421 : /*
6422 : * If there is no parallel worker involved to process this transaction
6423 : * then we either directly apply the change or serialize it to a file
6424 : * which will later be applied when the transaction finish message is
6425 : * processed.
6426 : */
6427 207151 : else if (in_streamed_transaction)
6428 : {
6429 103197 : return TRANS_LEADER_SERIALIZE;
6430 : }
6431 : else
6432 : {
6433 103954 : return TRANS_LEADER_APPLY;
6434 : }
6435 : }
|