Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * worker.c
3 : * PostgreSQL logical replication worker (apply)
4 : *
5 : * Copyright (c) 2016-2026, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/worker.c
9 : *
10 : * NOTES
11 : * This file contains the worker which applies logical changes as they come
12 : * from remote logical replication stream.
13 : *
14 : * The main worker (apply) is started by logical replication worker
15 : * launcher for every enabled subscription in a database. It uses
16 : * walsender protocol to communicate with publisher.
17 : *
18 : * This module includes server facing code and shares libpqwalreceiver
19 : * module with walreceiver for providing the libpq specific functionality.
20 : *
21 : *
22 : * STREAMED TRANSACTIONS
23 : * ---------------------
24 : * Streamed transactions (large transactions exceeding a memory limit on the
25 : * upstream) are applied using one of two approaches:
26 : *
27 : * 1) Write to temporary files and apply when the final commit arrives
28 : *
29 : * This approach is used when the user has set the subscription's streaming
30 : * option as on.
31 : *
32 : * Unlike the regular (non-streamed) case, handling streamed transactions has
33 : * to handle aborts of both the toplevel transaction and subtransactions. This
34 : * is achieved by tracking offsets for subtransactions, which is then used
35 : * to truncate the file with serialized changes.
36 : *
37 : * The files are placed in tmp file directory by default, and the filenames
38 : * include both the XID of the toplevel transaction and OID of the
39 : * subscription. This is necessary so that different workers processing a
40 : * remote transaction with the same XID doesn't interfere.
41 : *
42 : * We use BufFiles instead of using normal temporary files because (a) the
43 : * BufFile infrastructure supports temporary files that exceed the OS file size
44 : * limit, (b) provides a way for automatic clean up on the error and (c) provides
45 : * a way to survive these files across local transactions and allow to open and
46 : * close at stream start and close. We decided to use FileSet
47 : * infrastructure as without that it deletes the files on the closure of the
48 : * file and if we decide to keep stream files open across the start/stop stream
49 : * then it will consume a lot of memory (more than 8K for each BufFile and
50 : * there could be multiple such BufFiles as the subscriber could receive
51 : * multiple start/stop streams for different transactions before getting the
52 : * commit). Moreover, if we don't use FileSet then we also need to invent
53 : * a new way to pass filenames to BufFile APIs so that we are allowed to open
54 : * the file we desired across multiple stream-open calls for the same
55 : * transaction.
56 : *
57 : * 2) Parallel apply workers.
58 : *
59 : * This approach is used when the user has set the subscription's streaming
60 : * option as parallel. See logical/applyparallelworker.c for information about
61 : * this approach.
62 : *
63 : * TWO_PHASE TRANSACTIONS
64 : * ----------------------
65 : * Two phase transactions are replayed at prepare and then committed or
66 : * rolled back at commit prepared and rollback prepared respectively. It is
67 : * possible to have a prepared transaction that arrives at the apply worker
68 : * when the tablesync is busy doing the initial copy. In this case, the apply
69 : * worker skips all the prepared operations [e.g. inserts] while the tablesync
70 : * is still busy (see the condition of should_apply_changes_for_rel). The
71 : * tablesync worker might not get such a prepared transaction because say it
72 : * was prior to the initial consistent point but might have got some later
73 : * commits. Now, the tablesync worker will exit without doing anything for the
74 : * prepared transaction skipped by the apply worker as the sync location for it
75 : * will be already ahead of the apply worker's current location. This would lead
76 : * to an "empty prepare", because later when the apply worker does the commit
77 : * prepare, there is nothing in it (the inserts were skipped earlier).
78 : *
79 : * To avoid this, and similar prepare confusions the subscription's two_phase
80 : * commit is enabled only after the initial sync is over. The two_phase option
81 : * has been implemented as a tri-state with values DISABLED, PENDING, and
82 : * ENABLED.
83 : *
84 : * Even if the user specifies they want a subscription with two_phase = on,
85 : * internally it will start with a tri-state of PENDING which only becomes
86 : * ENABLED after all tablesync initializations are completed - i.e. when all
87 : * tablesync workers have reached their READY state. In other words, the value
88 : * PENDING is only a temporary state for subscription start-up.
89 : *
90 : * Until the two_phase is properly available (ENABLED) the subscription will
91 : * behave as if two_phase = off. When the apply worker detects that all
92 : * tablesyncs have become READY (while the tri-state was PENDING) it will
93 : * restart the apply worker process. This happens in
94 : * ProcessSyncingTablesForApply.
95 : *
96 : * When the (re-started) apply worker finds that all tablesyncs are READY for a
97 : * two_phase tri-state of PENDING it start streaming messages with the
98 : * two_phase option which in turn enables the decoding of two-phase commits at
99 : * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
100 : * Now, it is possible that during the time we have not enabled two_phase, the
101 : * publisher (replication server) would have skipped some prepares but we
102 : * ensure that such prepares are sent along with commit prepare, see
103 : * ReorderBufferFinishPrepared.
104 : *
105 : * If the subscription has no tables then a two_phase tri-state PENDING is
106 : * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
107 : * PUBLICATION which might otherwise be disallowed (see below).
108 : *
109 : * If ever a user needs to be aware of the tri-state value, they can fetch it
110 : * from the pg_subscription catalog (see column subtwophasestate).
111 : *
112 : * Finally, to avoid problems mentioned in previous paragraphs from any
113 : * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
114 : * to 'off' and then again back to 'on') there is a restriction for
115 : * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
116 : * the two_phase tri-state is ENABLED, except when copy_data = false.
117 : *
118 : * We can get prepare of the same GID more than once for the genuine cases
119 : * where we have defined multiple subscriptions for publications on the same
120 : * server and prepared transaction has operations on tables subscribed to those
121 : * subscriptions. For such cases, if we use the GID sent by publisher one of
122 : * the prepares will be successful and others will fail, in which case the
123 : * server will send them again. Now, this can lead to a deadlock if user has
124 : * set synchronous_standby_names for all the subscriptions on subscriber. To
125 : * avoid such deadlocks, we generate a unique GID (consisting of the
126 : * subscription oid and the xid of the prepared transaction) for each prepare
127 : * transaction on the subscriber.
128 : *
129 : * FAILOVER
130 : * ----------------------
131 : * The logical slot on the primary can be synced to the standby by specifying
132 : * failover = true when creating the subscription. Enabling failover allows us
133 : * to smoothly transition to the promoted standby, ensuring that we can
134 : * subscribe to the new primary without losing any data.
135 : *
136 : * RETAIN DEAD TUPLES
137 : * ----------------------
138 : * Each apply worker that enabled retain_dead_tuples option maintains a
139 : * non-removable transaction ID (oldest_nonremovable_xid) in shared memory to
140 : * prevent dead rows from being removed prematurely when the apply worker still
141 : * needs them to detect update_deleted conflicts. Additionally, this helps to
142 : * retain the required commit_ts module information, which further helps to
143 : * detect update_origin_differs and delete_origin_differs conflicts reliably, as
144 : * otherwise, vacuum freeze could remove the required information.
145 : *
146 : * The logical replication launcher manages an internal replication slot named
147 : * "pg_conflict_detection". It asynchronously aggregates the non-removable
148 : * transaction ID from all apply workers to determine the appropriate xmin for
149 : * the slot, thereby retaining necessary tuples.
150 : *
151 : * The non-removable transaction ID in the apply worker is advanced to the
152 : * oldest running transaction ID once all concurrent transactions on the
153 : * publisher have been applied and flushed locally. The process involves:
154 : *
155 : * - RDT_GET_CANDIDATE_XID:
156 : * Call GetOldestActiveTransactionId() to take oldestRunningXid as the
157 : * candidate xid.
158 : *
159 : * - RDT_REQUEST_PUBLISHER_STATUS:
160 : * Send a message to the walsender requesting the publisher status, which
161 : * includes the latest WAL write position and information about transactions
162 : * that are in the commit phase.
163 : *
164 : * - RDT_WAIT_FOR_PUBLISHER_STATUS:
165 : * Wait for the status from the walsender. After receiving the first status,
166 : * do not proceed if there are concurrent remote transactions that are still
167 : * in the commit phase. These transactions might have been assigned an
168 : * earlier commit timestamp but have not yet written the commit WAL record.
169 : * Continue to request the publisher status (RDT_REQUEST_PUBLISHER_STATUS)
170 : * until all these transactions have completed.
171 : *
172 : * - RDT_WAIT_FOR_LOCAL_FLUSH:
173 : * Advance the non-removable transaction ID if the current flush location has
174 : * reached or surpassed the last received WAL position.
175 : *
176 : * - RDT_STOP_CONFLICT_INFO_RETENTION:
177 : * This phase is required only when max_retention_duration is defined. We
178 : * enter this phase if the wait time in either the
179 : * RDT_WAIT_FOR_PUBLISHER_STATUS or RDT_WAIT_FOR_LOCAL_FLUSH phase exceeds
180 : * configured max_retention_duration. In this phase,
181 : * pg_subscription.subretentionactive is updated to false within a new
182 : * transaction, and oldest_nonremovable_xid is set to InvalidTransactionId.
183 : *
184 : * - RDT_RESUME_CONFLICT_INFO_RETENTION:
185 : * This phase is required only when max_retention_duration is defined. We
186 : * enter this phase if the retention was previously stopped, and the time
187 : * required to advance the non-removable transaction ID in the
188 : * RDT_WAIT_FOR_LOCAL_FLUSH phase has decreased to within acceptable limits
189 : * (or if max_retention_duration is set to 0). During this phase,
190 : * pg_subscription.subretentionactive is updated to true within a new
191 : * transaction, and the worker will be restarted.
192 : *
193 : * The overall state progression is: GET_CANDIDATE_XID ->
194 : * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
195 : * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
196 : * WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID.
197 : *
198 : * Retaining the dead tuples for this period is sufficient for ensuring
199 : * eventual consistency using last-update-wins strategy, as dead tuples are
200 : * useful for detecting conflicts only during the application of concurrent
201 : * transactions from remote nodes. After applying and flushing all remote
202 : * transactions that occurred concurrently with the tuple DELETE, any
203 : * subsequent UPDATE from a remote node should have a later timestamp. In such
204 : * cases, it is acceptable to detect an update_missing scenario and convert the
205 : * UPDATE to an INSERT when applying it. But, for concurrent remote
206 : * transactions with earlier timestamps than the DELETE, detecting
207 : * update_deleted is necessary, as the UPDATEs in remote transactions should be
208 : * ignored if their timestamp is earlier than that of the dead tuples.
209 : *
210 : * Note that advancing the non-removable transaction ID is not supported if the
211 : * publisher is also a physical standby. This is because the logical walsender
212 : * on the standby can only get the WAL replay position but there may be more
213 : * WALs that are being replicated from the primary and those WALs could have
214 : * earlier commit timestamp.
215 : *
216 : * Similarly, when the publisher has subscribed to another publisher,
217 : * information necessary for conflict detection cannot be retained for
218 : * changes from origins other than the publisher. This is because publisher
219 : * lacks the information on concurrent transactions of other publishers to
220 : * which it subscribes. As the information on concurrent transactions is
221 : * unavailable beyond subscriber's immediate publishers, the non-removable
222 : * transaction ID might be advanced prematurely before changes from other
223 : * origins have been fully applied.
224 : *
225 : * XXX Retaining information for changes from other origins might be possible
226 : * by requesting the subscription on that origin to enable retain_dead_tuples
227 : * and fetching the conflict detection slot.xmin along with the publisher's
228 : * status. In the RDT_WAIT_FOR_PUBLISHER_STATUS phase, the apply worker could
229 : * wait for the remote slot's xmin to reach the oldest active transaction ID,
230 : * ensuring that all transactions from other origins have been applied on the
231 : * publisher, thereby getting the latest WAL position that includes all
232 : * concurrent changes. However, this approach may impact performance, so it
233 : * might not worth the effort.
234 : *
235 : * XXX It seems feasible to get the latest commit's WAL location from the
236 : * publisher and wait till that is applied. However, we can't do that
237 : * because commit timestamps can regress as a commit with a later LSN is not
238 : * guaranteed to have a later timestamp than those with earlier LSNs. Having
239 : * said that, even if that is possible, it won't improve performance much as
240 : * the apply always lag and moves slowly as compared with the transactions
241 : * on the publisher.
242 : *-------------------------------------------------------------------------
243 : */
244 :
245 : #include "postgres.h"
246 :
247 : #include <sys/stat.h>
248 : #include <unistd.h>
249 :
250 : #include "access/commit_ts.h"
251 : #include "access/table.h"
252 : #include "access/tableam.h"
253 : #include "access/tupconvert.h"
254 : #include "access/twophase.h"
255 : #include "access/xact.h"
256 : #include "catalog/indexing.h"
257 : #include "catalog/pg_inherits.h"
258 : #include "catalog/pg_subscription.h"
259 : #include "catalog/pg_subscription_rel.h"
260 : #include "commands/subscriptioncmds.h"
261 : #include "commands/tablecmds.h"
262 : #include "commands/trigger.h"
263 : #include "executor/executor.h"
264 : #include "executor/execPartition.h"
265 : #include "libpq/pqformat.h"
266 : #include "miscadmin.h"
267 : #include "optimizer/optimizer.h"
268 : #include "parser/parse_relation.h"
269 : #include "pgstat.h"
270 : #include "port/pg_bitutils.h"
271 : #include "postmaster/bgworker.h"
272 : #include "postmaster/interrupt.h"
273 : #include "postmaster/walwriter.h"
274 : #include "replication/conflict.h"
275 : #include "replication/logicallauncher.h"
276 : #include "replication/logicalproto.h"
277 : #include "replication/logicalrelation.h"
278 : #include "replication/logicalworker.h"
279 : #include "replication/origin.h"
280 : #include "replication/slot.h"
281 : #include "replication/walreceiver.h"
282 : #include "replication/worker_internal.h"
283 : #include "rewrite/rewriteHandler.h"
284 : #include "storage/buffile.h"
285 : #include "storage/ipc.h"
286 : #include "storage/latch.h"
287 : #include "storage/lmgr.h"
288 : #include "storage/procarray.h"
289 : #include "tcop/tcopprot.h"
290 : #include "utils/acl.h"
291 : #include "utils/guc.h"
292 : #include "utils/inval.h"
293 : #include "utils/lsyscache.h"
294 : #include "utils/memutils.h"
295 : #include "utils/pg_lsn.h"
296 : #include "utils/rel.h"
297 : #include "utils/rls.h"
298 : #include "utils/snapmgr.h"
299 : #include "utils/syscache.h"
300 : #include "utils/usercontext.h"
301 : #include "utils/wait_event.h"
302 :
303 : #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
304 :
305 : typedef struct FlushPosition
306 : {
307 : dlist_node node;
308 : XLogRecPtr local_end;
309 : XLogRecPtr remote_end;
310 : } FlushPosition;
311 :
312 : static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
313 :
314 : typedef struct ApplyExecutionData
315 : {
316 : EState *estate; /* executor state, used to track resources */
317 :
318 : LogicalRepRelMapEntry *targetRel; /* replication target rel */
319 : ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
320 :
321 : /* These fields are used when the target relation is partitioned: */
322 : ModifyTableState *mtstate; /* dummy ModifyTable state */
323 : PartitionTupleRouting *proute; /* partition routing info */
324 : } ApplyExecutionData;
325 :
326 : /* Struct for saving and restoring apply errcontext information */
327 : typedef struct ApplyErrorCallbackArg
328 : {
329 : LogicalRepMsgType command; /* 0 if invalid */
330 : LogicalRepRelMapEntry *rel;
331 :
332 : /* Remote node information */
333 : int remote_attnum; /* -1 if invalid */
334 : TransactionId remote_xid;
335 : XLogRecPtr finish_lsn;
336 : char *origin_name;
337 : } ApplyErrorCallbackArg;
338 :
339 : /*
340 : * The action to be taken for the changes in the transaction.
341 : *
342 : * TRANS_LEADER_APPLY:
343 : * This action means that we are in the leader apply worker or table sync
344 : * worker. The changes of the transaction are either directly applied or
345 : * are read from temporary files (for streaming transactions) and then
346 : * applied by the worker.
347 : *
348 : * TRANS_LEADER_SERIALIZE:
349 : * This action means that we are in the leader apply worker or table sync
350 : * worker. Changes are written to temporary files and then applied when the
351 : * final commit arrives.
352 : *
353 : * TRANS_LEADER_SEND_TO_PARALLEL:
354 : * This action means that we are in the leader apply worker and need to send
355 : * the changes to the parallel apply worker.
356 : *
357 : * TRANS_LEADER_PARTIAL_SERIALIZE:
358 : * This action means that we are in the leader apply worker and have sent some
359 : * changes directly to the parallel apply worker and the remaining changes are
360 : * serialized to a file, due to timeout while sending data. The parallel apply
361 : * worker will apply these serialized changes when the final commit arrives.
362 : *
363 : * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
364 : * serializing changes, the leader worker also needs to serialize the
365 : * STREAM_XXX message to a file, and wait for the parallel apply worker to
366 : * finish the transaction when processing the transaction finish command. So
367 : * this new action was introduced to keep the code and logic clear.
368 : *
369 : * TRANS_PARALLEL_APPLY:
370 : * This action means that we are in the parallel apply worker and changes of
371 : * the transaction are applied directly by the worker.
372 : */
373 : typedef enum
374 : {
375 : /* The action for non-streaming transactions. */
376 : TRANS_LEADER_APPLY,
377 :
378 : /* Actions for streaming transactions. */
379 : TRANS_LEADER_SERIALIZE,
380 : TRANS_LEADER_SEND_TO_PARALLEL,
381 : TRANS_LEADER_PARTIAL_SERIALIZE,
382 : TRANS_PARALLEL_APPLY,
383 : } TransApplyAction;
384 :
385 : /*
386 : * The phases involved in advancing the non-removable transaction ID.
387 : *
388 : * See comments atop worker.c for details of the transition between these
389 : * phases.
390 : */
391 : typedef enum
392 : {
393 : RDT_GET_CANDIDATE_XID,
394 : RDT_REQUEST_PUBLISHER_STATUS,
395 : RDT_WAIT_FOR_PUBLISHER_STATUS,
396 : RDT_WAIT_FOR_LOCAL_FLUSH,
397 : RDT_STOP_CONFLICT_INFO_RETENTION,
398 : RDT_RESUME_CONFLICT_INFO_RETENTION,
399 : } RetainDeadTuplesPhase;
400 :
401 : /*
402 : * Critical information for managing phase transitions within the
403 : * RetainDeadTuplesPhase.
404 : */
405 : typedef struct RetainDeadTuplesData
406 : {
407 : RetainDeadTuplesPhase phase; /* current phase */
408 : XLogRecPtr remote_lsn; /* WAL write position on the publisher */
409 :
410 : /*
411 : * Oldest transaction ID that was in the commit phase on the publisher.
412 : * Use FullTransactionId to prevent issues with transaction ID wraparound,
413 : * where a new remote_oldestxid could falsely appear to originate from the
414 : * past and block advancement.
415 : */
416 : FullTransactionId remote_oldestxid;
417 :
418 : /*
419 : * Next transaction ID to be assigned on the publisher. Use
420 : * FullTransactionId for consistency and to allow straightforward
421 : * comparisons with remote_oldestxid.
422 : */
423 : FullTransactionId remote_nextxid;
424 :
425 : TimestampTz reply_time; /* when the publisher responds with status */
426 :
427 : /*
428 : * Publisher transaction ID that must be awaited to complete before
429 : * entering the final phase (RDT_WAIT_FOR_LOCAL_FLUSH). Use
430 : * FullTransactionId for the same reason as remote_nextxid.
431 : */
432 : FullTransactionId remote_wait_for;
433 :
434 : TransactionId candidate_xid; /* candidate for the non-removable
435 : * transaction ID */
436 : TimestampTz flushpos_update_time; /* when the remote flush position was
437 : * updated in final phase
438 : * (RDT_WAIT_FOR_LOCAL_FLUSH) */
439 :
440 : long table_sync_wait_time; /* time spent waiting for table sync
441 : * to finish */
442 :
443 : /*
444 : * The following fields are used to determine the timing for the next
445 : * round of transaction ID advancement.
446 : */
447 : TimestampTz last_recv_time; /* when the last message was received */
448 : TimestampTz candidate_xid_time; /* when the candidate_xid is decided */
449 : int xid_advance_interval; /* how much time (ms) to wait before
450 : * attempting to advance the
451 : * non-removable transaction ID */
452 : } RetainDeadTuplesData;
453 :
454 : /*
455 : * The minimum (100ms) and maximum (3 minutes) intervals for advancing
456 : * non-removable transaction IDs. The maximum interval is a bit arbitrary but
457 : * is sufficient to not cause any undue network traffic.
458 : */
459 : #define MIN_XID_ADVANCE_INTERVAL 100
460 : #define MAX_XID_ADVANCE_INTERVAL 180000
461 :
462 : /* errcontext tracker */
463 : static ApplyErrorCallbackArg apply_error_callback_arg =
464 : {
465 : .command = 0,
466 : .rel = NULL,
467 : .remote_attnum = -1,
468 : .remote_xid = InvalidTransactionId,
469 : .finish_lsn = InvalidXLogRecPtr,
470 : .origin_name = NULL,
471 : };
472 :
473 : ErrorContextCallback *apply_error_context_stack = NULL;
474 :
475 : MemoryContext ApplyMessageContext = NULL;
476 : MemoryContext ApplyContext = NULL;
477 :
478 : /* per stream context for streaming transactions */
479 : static MemoryContext LogicalStreamingContext = NULL;
480 :
481 : WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
482 :
483 : Subscription *MySubscription = NULL;
484 : static bool MySubscriptionValid = false;
485 :
486 : static List *on_commit_wakeup_workers_subids = NIL;
487 :
488 : bool in_remote_transaction = false;
489 : static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
490 :
491 : /* fields valid only when processing streamed transaction */
492 : static bool in_streamed_transaction = false;
493 :
494 : static TransactionId stream_xid = InvalidTransactionId;
495 :
496 : /*
497 : * The number of changes applied by parallel apply worker during one streaming
498 : * block.
499 : */
500 : static uint32 parallel_stream_nchanges = 0;
501 :
502 : /* Are we initializing an apply worker? */
503 : bool InitializingApplyWorker = false;
504 :
505 : /*
506 : * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
507 : * the subscription if the remote transaction's finish LSN matches the subskiplsn.
508 : * Once we start skipping changes, we don't stop it until we skip all changes of
509 : * the transaction even if pg_subscription is updated and MySubscription->skiplsn
510 : * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
511 : * we don't skip receiving and spooling the changes since we decide whether or not
512 : * to skip applying the changes when starting to apply changes. The subskiplsn is
513 : * cleared after successfully skipping the transaction or applying non-empty
514 : * transaction. The latter prevents the mistakenly specified subskiplsn from
515 : * being left. Note that we cannot skip the streaming transactions when using
516 : * parallel apply workers because we cannot get the finish LSN before applying
517 : * the changes. So, we don't start parallel apply worker when finish LSN is set
518 : * by the user.
519 : */
520 : static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
521 : #define is_skipping_changes() (unlikely(XLogRecPtrIsValid(skip_xact_finish_lsn)))
522 :
523 : /* BufFile handle of the current streaming file */
524 : static BufFile *stream_fd = NULL;
525 :
526 : /*
527 : * The remote WAL position that has been applied and flushed locally. We record
528 : * and use this information both while sending feedback to the server and
529 : * advancing oldest_nonremovable_xid.
530 : */
531 : static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
532 :
533 : typedef struct SubXactInfo
534 : {
535 : TransactionId xid; /* XID of the subxact */
536 : int fileno; /* file number in the buffile */
537 : pgoff_t offset; /* offset in the file */
538 : } SubXactInfo;
539 :
540 : /* Sub-transaction data for the current streaming transaction */
541 : typedef struct ApplySubXactData
542 : {
543 : uint32 nsubxacts; /* number of sub-transactions */
544 : uint32 nsubxacts_max; /* current capacity of subxacts */
545 : TransactionId subxact_last; /* xid of the last sub-transaction */
546 : SubXactInfo *subxacts; /* sub-xact offset in changes file */
547 : } ApplySubXactData;
548 :
549 : static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
550 :
551 : static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
552 : static inline void changes_filename(char *path, Oid subid, TransactionId xid);
553 :
554 : /*
555 : * Information about subtransactions of a given toplevel transaction.
556 : */
557 : static void subxact_info_write(Oid subid, TransactionId xid);
558 : static void subxact_info_read(Oid subid, TransactionId xid);
559 : static void subxact_info_add(TransactionId xid);
560 : static inline void cleanup_subxact_info(void);
561 :
562 : /*
563 : * Serialize and deserialize changes for a toplevel transaction.
564 : */
565 : static void stream_open_file(Oid subid, TransactionId xid,
566 : bool first_segment);
567 : static void stream_write_change(char action, StringInfo s);
568 : static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
569 : static void stream_close_file(void);
570 :
571 : static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
572 :
573 : static void maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
574 : bool status_received);
575 : static bool can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data);
576 : static void process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
577 : bool status_received);
578 : static void get_candidate_xid(RetainDeadTuplesData *rdt_data);
579 : static void request_publisher_status(RetainDeadTuplesData *rdt_data);
580 : static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
581 : bool status_received);
582 : static void wait_for_local_flush(RetainDeadTuplesData *rdt_data);
583 : static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
584 : static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
585 : static void resume_conflict_info_retention(RetainDeadTuplesData *rdt_data);
586 : static bool update_retention_status(bool active);
587 : static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data);
588 : static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data,
589 : bool new_xid_found);
590 :
591 : static void apply_worker_exit(void);
592 :
593 : static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
594 : static void apply_handle_insert_internal(ApplyExecutionData *edata,
595 : ResultRelInfo *relinfo,
596 : TupleTableSlot *remoteslot);
597 : static void apply_handle_update_internal(ApplyExecutionData *edata,
598 : ResultRelInfo *relinfo,
599 : TupleTableSlot *remoteslot,
600 : LogicalRepTupleData *newtup,
601 : Oid localindexoid);
602 : static void apply_handle_delete_internal(ApplyExecutionData *edata,
603 : ResultRelInfo *relinfo,
604 : TupleTableSlot *remoteslot,
605 : Oid localindexoid);
606 : static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
607 : LogicalRepRelation *remoterel,
608 : Oid localidxoid,
609 : TupleTableSlot *remoteslot,
610 : TupleTableSlot **localslot);
611 : static bool FindDeletedTupleInLocalRel(Relation localrel,
612 : Oid localidxoid,
613 : TupleTableSlot *remoteslot,
614 : TransactionId *delete_xid,
615 : ReplOriginId *delete_origin,
616 : TimestampTz *delete_time);
617 : static void apply_handle_tuple_routing(ApplyExecutionData *edata,
618 : TupleTableSlot *remoteslot,
619 : LogicalRepTupleData *newtup,
620 : CmdType operation);
621 :
622 : /* Functions for skipping changes */
623 : static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
624 : static void stop_skipping_changes(void);
625 : static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
626 :
627 : /* Functions for apply error callback */
628 : static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
629 : static inline void reset_apply_error_context_info(void);
630 :
631 : static TransApplyAction get_transaction_apply_action(TransactionId xid,
632 : ParallelApplyWorkerInfo **winfo);
633 :
634 : static void set_wal_receiver_timeout(void);
635 :
636 : static void on_exit_clear_xact_state(int code, Datum arg);
637 :
638 : /*
639 : * Form the origin name for the subscription.
640 : *
641 : * This is a common function for tablesync and other workers. Tablesync workers
642 : * must pass a valid relid. Other callers must pass relid = InvalidOid.
643 : *
644 : * Return the name in the supplied buffer.
645 : */
646 : void
647 1393 : ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
648 : char *originname, Size szoriginname)
649 : {
650 1393 : if (OidIsValid(relid))
651 : {
652 : /* Replication origin name for tablesync workers. */
653 787 : snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
654 : }
655 : else
656 : {
657 : /* Replication origin name for non-tablesync workers. */
658 606 : snprintf(originname, szoriginname, "pg_%u", suboid);
659 : }
660 1393 : }
661 :
662 : /*
663 : * Should this worker apply changes for given relation.
664 : *
665 : * This is mainly needed for initial relation data sync as that runs in
666 : * separate worker process running in parallel and we need some way to skip
667 : * changes coming to the leader apply worker during the sync of a table.
668 : *
669 : * Note we need to do smaller or equals comparison for SYNCDONE state because
670 : * it might hold position of end of initial slot consistent point WAL
671 : * record + 1 (ie start of next record) and next record can be COMMIT of
672 : * transaction we are now processing (which is what we set remote_final_lsn
673 : * to in apply_handle_begin).
674 : *
675 : * Note that for streaming transactions that are being applied in the parallel
676 : * apply worker, we disallow applying changes if the target table in the
677 : * subscription is not in the READY state, because we cannot decide whether to
678 : * apply the change as we won't know remote_final_lsn by that time.
679 : *
680 : * We already checked this in pa_can_start() before assigning the
681 : * streaming transaction to the parallel worker, but it also needs to be
682 : * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
683 : * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
684 : * while applying this transaction.
685 : */
686 : static bool
687 148397 : should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
688 : {
689 148397 : switch (MyLogicalRepWorker->type)
690 : {
691 0 : case WORKERTYPE_TABLESYNC:
692 0 : return MyLogicalRepWorker->relid == rel->localreloid;
693 :
694 68553 : case WORKERTYPE_PARALLEL_APPLY:
695 : /* We don't synchronize rel's that are in unknown state. */
696 68553 : if (rel->state != SUBREL_STATE_READY &&
697 0 : rel->state != SUBREL_STATE_UNKNOWN)
698 0 : ereport(ERROR,
699 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
700 : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
701 : MySubscription->name),
702 : errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
703 :
704 68553 : return rel->state == SUBREL_STATE_READY;
705 :
706 79844 : case WORKERTYPE_APPLY:
707 79897 : return (rel->state == SUBREL_STATE_READY ||
708 53 : (rel->state == SUBREL_STATE_SYNCDONE &&
709 15 : rel->statelsn <= remote_final_lsn));
710 :
711 0 : case WORKERTYPE_SEQUENCESYNC:
712 : /* Should never happen. */
713 0 : elog(ERROR, "sequence synchronization worker is not expected to apply changes");
714 : break;
715 :
716 0 : case WORKERTYPE_UNKNOWN:
717 : /* Should never happen. */
718 0 : elog(ERROR, "Unknown worker type");
719 : }
720 :
721 0 : return false; /* dummy for compiler */
722 : }
723 :
724 : /*
725 : * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
726 : *
727 : * Start a transaction, if this is the first step (else we keep using the
728 : * existing transaction).
729 : * Also provide a global snapshot and ensure we run in ApplyMessageContext.
730 : */
731 : static void
732 148861 : begin_replication_step(void)
733 : {
734 148861 : SetCurrentStatementStartTimestamp();
735 :
736 148861 : if (!IsTransactionState())
737 : {
738 982 : StartTransactionCommand();
739 982 : maybe_reread_subscription();
740 : }
741 :
742 148858 : PushActiveSnapshot(GetTransactionSnapshot());
743 :
744 148858 : MemoryContextSwitchTo(ApplyMessageContext);
745 148858 : }
746 :
747 : /*
748 : * Finish up one step of a replication transaction.
749 : * Callers of begin_replication_step() must also call this.
750 : *
751 : * We don't close out the transaction here, but we should increment
752 : * the command counter to make the effects of this step visible.
753 : */
754 : static void
755 148807 : end_replication_step(void)
756 : {
757 148807 : PopActiveSnapshot();
758 :
759 148807 : CommandCounterIncrement();
760 148807 : }
761 :
762 : /*
763 : * Handle streamed transactions for both the leader apply worker and the
764 : * parallel apply workers.
765 : *
766 : * In the streaming case (receiving a block of the streamed transaction), for
767 : * serialize mode, simply redirect it to a file for the proper toplevel
768 : * transaction, and for parallel mode, the leader apply worker will send the
769 : * changes to parallel apply workers and the parallel apply worker will define
770 : * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
771 : * messages will be applied by both leader apply worker and parallel apply
772 : * workers).
773 : *
774 : * Returns true for streamed transactions (when the change is either serialized
775 : * to file or sent to parallel apply worker), false otherwise (regular mode or
776 : * needs to be processed by parallel apply worker).
777 : *
778 : * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
779 : * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
780 : * to a parallel apply worker.
781 : */
782 : static bool
783 324753 : handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
784 : {
785 : TransactionId current_xid;
786 : ParallelApplyWorkerInfo *winfo;
787 : TransApplyAction apply_action;
788 : StringInfoData original_msg;
789 :
790 324753 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
791 :
792 : /* not in streaming mode */
793 324753 : if (apply_action == TRANS_LEADER_APPLY)
794 80262 : return false;
795 :
796 : Assert(TransactionIdIsValid(stream_xid));
797 :
798 : /*
799 : * The parallel apply worker needs the xid in this message to decide
800 : * whether to define a savepoint, so save the original message that has
801 : * not moved the cursor after the xid. We will serialize this message to a
802 : * file in PARTIAL_SERIALIZE mode.
803 : */
804 244491 : original_msg = *s;
805 :
806 : /*
807 : * We should have received XID of the subxact as the first part of the
808 : * message, so extract it.
809 : */
810 244491 : current_xid = pq_getmsgint(s, 4);
811 :
812 244491 : if (!TransactionIdIsValid(current_xid))
813 0 : ereport(ERROR,
814 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
815 : errmsg_internal("invalid transaction ID in streamed replication transaction")));
816 :
817 244491 : switch (apply_action)
818 : {
819 102512 : case TRANS_LEADER_SERIALIZE:
820 : Assert(stream_fd);
821 :
822 : /* Add the new subxact to the array (unless already there). */
823 102512 : subxact_info_add(current_xid);
824 :
825 : /* Write the change to the current file */
826 102512 : stream_write_change(action, s);
827 102512 : return true;
828 :
829 68390 : case TRANS_LEADER_SEND_TO_PARALLEL:
830 : Assert(winfo);
831 :
832 : /*
833 : * XXX The publisher side doesn't always send relation/type update
834 : * messages after the streaming transaction, so also update the
835 : * relation/type in leader apply worker. See function
836 : * cleanup_rel_sync_cache.
837 : */
838 68390 : if (pa_send_data(winfo, s->len, s->data))
839 68390 : return (action != LOGICAL_REP_MSG_RELATION &&
840 : action != LOGICAL_REP_MSG_TYPE);
841 :
842 : /*
843 : * Switch to serialize mode when we are not able to send the
844 : * change to parallel apply worker.
845 : */
846 0 : pa_switch_to_partial_serialize(winfo, false);
847 :
848 : pg_fallthrough;
849 5006 : case TRANS_LEADER_PARTIAL_SERIALIZE:
850 5006 : stream_write_change(action, &original_msg);
851 :
852 : /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
853 5006 : return (action != LOGICAL_REP_MSG_RELATION &&
854 : action != LOGICAL_REP_MSG_TYPE);
855 :
856 68583 : case TRANS_PARALLEL_APPLY:
857 68583 : parallel_stream_nchanges += 1;
858 :
859 : /* Define a savepoint for a subxact if needed. */
860 68583 : pa_start_subtrans(current_xid, stream_xid);
861 68583 : return false;
862 :
863 0 : default:
864 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
865 : return false; /* silence compiler warning */
866 : }
867 : }
868 :
869 : /*
870 : * Executor state preparation for evaluation of constraint expressions,
871 : * indexes and triggers for the specified relation.
872 : *
873 : * Note that the caller must open and close any indexes to be updated.
874 : */
875 : static ApplyExecutionData *
876 148329 : create_edata_for_relation(LogicalRepRelMapEntry *rel)
877 : {
878 : ApplyExecutionData *edata;
879 : EState *estate;
880 : RangeTblEntry *rte;
881 148329 : List *perminfos = NIL;
882 : ResultRelInfo *resultRelInfo;
883 :
884 148329 : edata = palloc0_object(ApplyExecutionData);
885 148329 : edata->targetRel = rel;
886 :
887 148329 : edata->estate = estate = CreateExecutorState();
888 :
889 148329 : rte = makeNode(RangeTblEntry);
890 148329 : rte->rtekind = RTE_RELATION;
891 148329 : rte->relid = RelationGetRelid(rel->localrel);
892 148329 : rte->relkind = rel->localrel->rd_rel->relkind;
893 148329 : rte->rellockmode = AccessShareLock;
894 :
895 148329 : addRTEPermissionInfo(&perminfos, rte);
896 :
897 148329 : ExecInitRangeTable(estate, list_make1(rte), perminfos,
898 : bms_make_singleton(1));
899 :
900 148329 : edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
901 :
902 : /*
903 : * Use Relation opened by logicalrep_rel_open() instead of opening it
904 : * again.
905 : */
906 148329 : InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
907 :
908 : /*
909 : * We put the ResultRelInfo in the es_opened_result_relations list, even
910 : * though we don't populate the es_result_relations array. That's a bit
911 : * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
912 : *
913 : * ExecOpenIndices() is not called here either, each execution path doing
914 : * an apply operation being responsible for that.
915 : */
916 148329 : estate->es_opened_result_relations =
917 148329 : lappend(estate->es_opened_result_relations, resultRelInfo);
918 :
919 148329 : estate->es_output_cid = GetCurrentCommandId(true);
920 :
921 : /* Prepare to catch AFTER triggers. */
922 148329 : AfterTriggerBeginQuery();
923 :
924 : /* other fields of edata remain NULL for now */
925 :
926 148329 : return edata;
927 : }
928 :
929 : /*
930 : * Finish any operations related to the executor state created by
931 : * create_edata_for_relation().
932 : */
933 : static void
934 148289 : finish_edata(ApplyExecutionData *edata)
935 : {
936 148289 : EState *estate = edata->estate;
937 :
938 : /* Handle any queued AFTER triggers. */
939 148289 : AfterTriggerEndQuery(estate);
940 :
941 : /* Shut down tuple routing, if any was done. */
942 148289 : if (edata->proute)
943 74 : ExecCleanupTupleRouting(edata->mtstate, edata->proute);
944 :
945 : /*
946 : * Cleanup. It might seem that we should call ExecCloseResultRelations()
947 : * here, but we intentionally don't. It would close the rel we added to
948 : * es_opened_result_relations above, which is wrong because we took no
949 : * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
950 : * any other relations opened during execution.
951 : */
952 148289 : ExecResetTupleTable(estate->es_tupleTable, false);
953 148289 : FreeExecutorState(estate);
954 148289 : pfree(edata);
955 148289 : }
956 :
957 : /*
958 : * Executes default values for columns for which we can't map to remote
959 : * relation columns.
960 : *
961 : * This allows us to support tables which have more columns on the downstream
962 : * than on the upstream.
963 : */
964 : static void
965 76060 : slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
966 : TupleTableSlot *slot)
967 : {
968 76060 : TupleDesc desc = RelationGetDescr(rel->localrel);
969 76060 : int num_phys_attrs = desc->natts;
970 : int i;
971 : int attnum,
972 76060 : num_defaults = 0;
973 : int *defmap;
974 : ExprState **defexprs;
975 : ExprContext *econtext;
976 :
977 76060 : econtext = GetPerTupleExprContext(estate);
978 :
979 : /* We got all the data via replication, no need to evaluate anything. */
980 76060 : if (num_phys_attrs == rel->remoterel.natts)
981 35915 : return;
982 :
983 40145 : defmap = palloc_array(int, num_phys_attrs);
984 40145 : defexprs = palloc_array(ExprState *, num_phys_attrs);
985 :
986 : Assert(rel->attrmap->maplen == num_phys_attrs);
987 210663 : for (attnum = 0; attnum < num_phys_attrs; attnum++)
988 : {
989 170518 : CompactAttribute *cattr = TupleDescCompactAttr(desc, attnum);
990 : Expr *defexpr;
991 :
992 170518 : if (cattr->attisdropped || cattr->attgenerated)
993 9 : continue;
994 :
995 170509 : if (rel->attrmap->attnums[attnum] >= 0)
996 92268 : continue;
997 :
998 78241 : defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
999 :
1000 78241 : if (defexpr != NULL)
1001 : {
1002 : /* Run the expression through planner */
1003 70131 : defexpr = expression_planner(defexpr);
1004 :
1005 : /* Initialize executable expression in copycontext */
1006 70131 : defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
1007 70131 : defmap[num_defaults] = attnum;
1008 70131 : num_defaults++;
1009 : }
1010 : }
1011 :
1012 110276 : for (i = 0; i < num_defaults; i++)
1013 70131 : slot->tts_values[defmap[i]] =
1014 70131 : ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
1015 : }
1016 :
1017 : /*
1018 : * Store tuple data into slot.
1019 : *
1020 : * Incoming data can be either text or binary format.
1021 : */
1022 : static void
1023 148350 : slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
1024 : LogicalRepTupleData *tupleData)
1025 : {
1026 148350 : int natts = slot->tts_tupleDescriptor->natts;
1027 : int i;
1028 :
1029 148350 : ExecClearTuple(slot);
1030 :
1031 : /* Call the "in" function for each non-dropped, non-null attribute */
1032 : Assert(natts == rel->attrmap->maplen);
1033 658310 : for (i = 0; i < natts; i++)
1034 : {
1035 509960 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
1036 509960 : int remoteattnum = rel->attrmap->attnums[i];
1037 :
1038 509960 : if (!att->attisdropped && remoteattnum >= 0)
1039 303043 : {
1040 303043 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
1041 :
1042 : Assert(remoteattnum < tupleData->ncols);
1043 :
1044 : /* Set attnum for error callback */
1045 303043 : apply_error_callback_arg.remote_attnum = remoteattnum;
1046 :
1047 303043 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
1048 : {
1049 : Oid typinput;
1050 : Oid typioparam;
1051 :
1052 142533 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1053 285066 : slot->tts_values[i] =
1054 142533 : OidInputFunctionCall(typinput, colvalue->data,
1055 : typioparam, att->atttypmod);
1056 142533 : slot->tts_isnull[i] = false;
1057 : }
1058 160510 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1059 : {
1060 : Oid typreceive;
1061 : Oid typioparam;
1062 :
1063 : /*
1064 : * In some code paths we may be asked to re-parse the same
1065 : * tuple data. Reset the StringInfo's cursor so that works.
1066 : */
1067 110166 : colvalue->cursor = 0;
1068 :
1069 110166 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1070 220332 : slot->tts_values[i] =
1071 110166 : OidReceiveFunctionCall(typreceive, colvalue,
1072 : typioparam, att->atttypmod);
1073 :
1074 : /* Trouble if it didn't eat the whole buffer */
1075 110166 : if (colvalue->cursor != colvalue->len)
1076 0 : ereport(ERROR,
1077 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
1078 : errmsg("incorrect binary data format in logical replication column %d",
1079 : remoteattnum + 1)));
1080 110166 : slot->tts_isnull[i] = false;
1081 : }
1082 : else
1083 : {
1084 : /*
1085 : * NULL value from remote. (We don't expect to see
1086 : * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
1087 : * NULL.)
1088 : */
1089 50344 : slot->tts_values[i] = (Datum) 0;
1090 50344 : slot->tts_isnull[i] = true;
1091 : }
1092 :
1093 : /* Reset attnum for error callback */
1094 303043 : apply_error_callback_arg.remote_attnum = -1;
1095 : }
1096 : else
1097 : {
1098 : /*
1099 : * We assign NULL to dropped attributes and missing values
1100 : * (missing values should be later filled using
1101 : * slot_fill_defaults).
1102 : */
1103 206917 : slot->tts_values[i] = (Datum) 0;
1104 206917 : slot->tts_isnull[i] = true;
1105 : }
1106 : }
1107 :
1108 148350 : ExecStoreVirtualTuple(slot);
1109 148350 : }
1110 :
1111 : /*
1112 : * Replace updated columns with data from the LogicalRepTupleData struct.
1113 : * This is somewhat similar to heap_modify_tuple but also calls the type
1114 : * input functions on the user data.
1115 : *
1116 : * "slot" is filled with a copy of the tuple in "srcslot", replacing
1117 : * columns provided in "tupleData" and leaving others as-is.
1118 : *
1119 : * Caution: unreplaced pass-by-ref columns in "slot" will point into the
1120 : * storage for "srcslot". This is OK for current usage, but someday we may
1121 : * need to materialize "slot" at the end to make it independent of "srcslot".
1122 : */
1123 : static void
1124 31924 : slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
1125 : LogicalRepRelMapEntry *rel,
1126 : LogicalRepTupleData *tupleData)
1127 : {
1128 31924 : int natts = slot->tts_tupleDescriptor->natts;
1129 : int i;
1130 :
1131 : /* We'll fill "slot" with a virtual tuple, so we must start with ... */
1132 31924 : ExecClearTuple(slot);
1133 :
1134 : /*
1135 : * Copy all the column data from srcslot, so that we'll have valid values
1136 : * for unreplaced columns.
1137 : */
1138 : Assert(natts == srcslot->tts_tupleDescriptor->natts);
1139 31924 : slot_getallattrs(srcslot);
1140 31924 : memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
1141 31924 : memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
1142 :
1143 : /* Call the "in" function for each replaced attribute */
1144 : Assert(natts == rel->attrmap->maplen);
1145 159280 : for (i = 0; i < natts; i++)
1146 : {
1147 127356 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
1148 127356 : int remoteattnum = rel->attrmap->attnums[i];
1149 :
1150 127356 : if (remoteattnum < 0)
1151 58519 : continue;
1152 :
1153 : Assert(remoteattnum < tupleData->ncols);
1154 :
1155 68837 : if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
1156 : {
1157 68837 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
1158 :
1159 : /* Set attnum for error callback */
1160 68837 : apply_error_callback_arg.remote_attnum = remoteattnum;
1161 :
1162 68837 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
1163 : {
1164 : Oid typinput;
1165 : Oid typioparam;
1166 :
1167 25433 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1168 50866 : slot->tts_values[i] =
1169 25433 : OidInputFunctionCall(typinput, colvalue->data,
1170 : typioparam, att->atttypmod);
1171 25433 : slot->tts_isnull[i] = false;
1172 : }
1173 43404 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1174 : {
1175 : Oid typreceive;
1176 : Oid typioparam;
1177 :
1178 : /*
1179 : * In some code paths we may be asked to re-parse the same
1180 : * tuple data. Reset the StringInfo's cursor so that works.
1181 : */
1182 43356 : colvalue->cursor = 0;
1183 :
1184 43356 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1185 86712 : slot->tts_values[i] =
1186 43356 : OidReceiveFunctionCall(typreceive, colvalue,
1187 : typioparam, att->atttypmod);
1188 :
1189 : /* Trouble if it didn't eat the whole buffer */
1190 43356 : if (colvalue->cursor != colvalue->len)
1191 0 : ereport(ERROR,
1192 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
1193 : errmsg("incorrect binary data format in logical replication column %d",
1194 : remoteattnum + 1)));
1195 43356 : slot->tts_isnull[i] = false;
1196 : }
1197 : else
1198 : {
1199 : /* must be LOGICALREP_COLUMN_NULL */
1200 48 : slot->tts_values[i] = (Datum) 0;
1201 48 : slot->tts_isnull[i] = true;
1202 : }
1203 :
1204 : /* Reset attnum for error callback */
1205 68837 : apply_error_callback_arg.remote_attnum = -1;
1206 : }
1207 : }
1208 :
1209 : /* And finally, declare that "slot" contains a valid virtual tuple */
1210 31924 : ExecStoreVirtualTuple(slot);
1211 31924 : }
1212 :
1213 : /*
1214 : * Handle BEGIN message.
1215 : */
1216 : static void
1217 502 : apply_handle_begin(StringInfo s)
1218 : {
1219 : LogicalRepBeginData begin_data;
1220 :
1221 : /* There must not be an active streaming transaction. */
1222 : Assert(!TransactionIdIsValid(stream_xid));
1223 :
1224 502 : logicalrep_read_begin(s, &begin_data);
1225 502 : set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
1226 :
1227 502 : remote_final_lsn = begin_data.final_lsn;
1228 :
1229 502 : maybe_start_skipping_changes(begin_data.final_lsn);
1230 :
1231 502 : in_remote_transaction = true;
1232 :
1233 502 : pgstat_report_activity(STATE_RUNNING, NULL);
1234 502 : }
1235 :
1236 : /*
1237 : * Handle COMMIT message.
1238 : *
1239 : * TODO, support tracking of multiple origins
1240 : */
1241 : static void
1242 450 : apply_handle_commit(StringInfo s)
1243 : {
1244 : LogicalRepCommitData commit_data;
1245 :
1246 450 : logicalrep_read_commit(s, &commit_data);
1247 :
1248 450 : if (commit_data.commit_lsn != remote_final_lsn)
1249 0 : ereport(ERROR,
1250 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1251 : errmsg_internal("incorrect commit LSN %X/%08X in commit message (expected %X/%08X)",
1252 : LSN_FORMAT_ARGS(commit_data.commit_lsn),
1253 : LSN_FORMAT_ARGS(remote_final_lsn))));
1254 :
1255 450 : apply_handle_commit_internal(&commit_data);
1256 :
1257 : /*
1258 : * Process any tables that are being synchronized in parallel, as well as
1259 : * any newly added tables or sequences.
1260 : */
1261 450 : ProcessSyncingRelations(commit_data.end_lsn);
1262 :
1263 450 : pgstat_report_activity(STATE_IDLE, NULL);
1264 450 : reset_apply_error_context_info();
1265 450 : }
1266 :
1267 : /*
1268 : * Handle BEGIN PREPARE message.
1269 : */
1270 : static void
1271 18 : apply_handle_begin_prepare(StringInfo s)
1272 : {
1273 : LogicalRepPreparedTxnData begin_data;
1274 :
1275 : /* Tablesync should never receive prepare. */
1276 18 : if (am_tablesync_worker())
1277 0 : ereport(ERROR,
1278 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1279 : errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1280 :
1281 : /* There must not be an active streaming transaction. */
1282 : Assert(!TransactionIdIsValid(stream_xid));
1283 :
1284 18 : logicalrep_read_begin_prepare(s, &begin_data);
1285 18 : set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
1286 :
1287 18 : remote_final_lsn = begin_data.prepare_lsn;
1288 :
1289 18 : maybe_start_skipping_changes(begin_data.prepare_lsn);
1290 :
1291 18 : in_remote_transaction = true;
1292 :
1293 18 : pgstat_report_activity(STATE_RUNNING, NULL);
1294 18 : }
1295 :
1296 : /*
1297 : * Common function to prepare the GID.
1298 : */
1299 : static void
1300 27 : apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
1301 : {
1302 : char gid[GIDSIZE];
1303 :
1304 : /*
1305 : * Compute unique GID for two_phase transactions. We don't use GID of
1306 : * prepared transaction sent by server as that can lead to deadlock when
1307 : * we have multiple subscriptions from same node point to publications on
1308 : * the same node. See comments atop worker.c
1309 : */
1310 27 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
1311 : gid, sizeof(gid));
1312 :
1313 : /*
1314 : * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1315 : * called within the PrepareTransactionBlock below.
1316 : */
1317 27 : if (!IsTransactionBlock())
1318 : {
1319 27 : BeginTransactionBlock();
1320 27 : CommitTransactionCommand(); /* Completes the preceding Begin command. */
1321 : }
1322 :
1323 : /*
1324 : * Update origin state so we can restart streaming from correct position
1325 : * in case of crash.
1326 : */
1327 27 : replorigin_xact_state.origin_lsn = prepare_data->end_lsn;
1328 27 : replorigin_xact_state.origin_timestamp = prepare_data->prepare_time;
1329 :
1330 27 : PrepareTransactionBlock(gid);
1331 27 : }
1332 :
1333 : /*
1334 : * Handle PREPARE message.
1335 : */
1336 : static void
1337 17 : apply_handle_prepare(StringInfo s)
1338 : {
1339 : LogicalRepPreparedTxnData prepare_data;
1340 :
1341 17 : logicalrep_read_prepare(s, &prepare_data);
1342 :
1343 17 : if (prepare_data.prepare_lsn != remote_final_lsn)
1344 0 : ereport(ERROR,
1345 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1346 : errmsg_internal("incorrect prepare LSN %X/%08X in prepare message (expected %X/%08X)",
1347 : LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1348 : LSN_FORMAT_ARGS(remote_final_lsn))));
1349 :
1350 : /*
1351 : * Unlike commit, here, we always prepare the transaction even though no
1352 : * change has happened in this transaction or all changes are skipped. It
1353 : * is done this way because at commit prepared time, we won't know whether
1354 : * we have skipped preparing a transaction because of those reasons.
1355 : *
1356 : * XXX, We can optimize such that at commit prepared time, we first check
1357 : * whether we have prepared the transaction or not but that doesn't seem
1358 : * worthwhile because such cases shouldn't be common.
1359 : */
1360 17 : begin_replication_step();
1361 :
1362 17 : apply_handle_prepare_internal(&prepare_data);
1363 :
1364 17 : end_replication_step();
1365 17 : CommitTransactionCommand();
1366 15 : pgstat_report_stat(false);
1367 :
1368 : /*
1369 : * It is okay not to set the local_end LSN for the prepare because we
1370 : * always flush the prepare record. So, we can send the acknowledgment of
1371 : * the remote_end LSN as soon as prepare is finished.
1372 : *
1373 : * XXX For the sake of consistency with commit, we could have set it with
1374 : * the LSN of prepare but as of now we don't track that value similar to
1375 : * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
1376 : * it.
1377 : */
1378 15 : store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
1379 :
1380 15 : in_remote_transaction = false;
1381 :
1382 : /*
1383 : * Process any tables that are being synchronized in parallel, as well as
1384 : * any newly added tables or sequences.
1385 : */
1386 15 : ProcessSyncingRelations(prepare_data.end_lsn);
1387 :
1388 : /*
1389 : * Since we have already prepared the transaction, in a case where the
1390 : * server crashes before clearing the subskiplsn, it will be left but the
1391 : * transaction won't be resent. But that's okay because it's a rare case
1392 : * and the subskiplsn will be cleared when finishing the next transaction.
1393 : */
1394 15 : stop_skipping_changes();
1395 15 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
1396 :
1397 15 : pgstat_report_activity(STATE_IDLE, NULL);
1398 15 : reset_apply_error_context_info();
1399 15 : }
1400 :
1401 : /*
1402 : * Handle a COMMIT PREPARED of a previously PREPARED transaction.
1403 : *
1404 : * Note that we don't need to wait here if the transaction was prepared in a
1405 : * parallel apply worker. In that case, we have already waited for the prepare
1406 : * to finish in apply_handle_stream_prepare() which will ensure all the
1407 : * operations in that transaction have happened in the subscriber, so no
1408 : * concurrent transaction can cause deadlock or transaction dependency issues.
1409 : */
1410 : static void
1411 22 : apply_handle_commit_prepared(StringInfo s)
1412 : {
1413 : LogicalRepCommitPreparedTxnData prepare_data;
1414 : char gid[GIDSIZE];
1415 :
1416 22 : logicalrep_read_commit_prepared(s, &prepare_data);
1417 22 : set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
1418 :
1419 : /* Compute GID for two_phase transactions. */
1420 22 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
1421 : gid, sizeof(gid));
1422 :
1423 : /* There is no transaction when COMMIT PREPARED is called */
1424 22 : begin_replication_step();
1425 :
1426 : /*
1427 : * Update origin state so we can restart streaming from correct position
1428 : * in case of crash.
1429 : */
1430 22 : replorigin_xact_state.origin_lsn = prepare_data.end_lsn;
1431 22 : replorigin_xact_state.origin_timestamp = prepare_data.commit_time;
1432 :
1433 22 : FinishPreparedTransaction(gid, true);
1434 22 : end_replication_step();
1435 22 : CommitTransactionCommand();
1436 22 : pgstat_report_stat(false);
1437 :
1438 22 : store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
1439 22 : in_remote_transaction = false;
1440 :
1441 : /*
1442 : * Process any tables that are being synchronized in parallel, as well as
1443 : * any newly added tables or sequences.
1444 : */
1445 22 : ProcessSyncingRelations(prepare_data.end_lsn);
1446 :
1447 22 : clear_subscription_skip_lsn(prepare_data.end_lsn);
1448 :
1449 22 : pgstat_report_activity(STATE_IDLE, NULL);
1450 22 : reset_apply_error_context_info();
1451 22 : }
1452 :
1453 : /*
1454 : * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
1455 : *
1456 : * Note that we don't need to wait here if the transaction was prepared in a
1457 : * parallel apply worker. In that case, we have already waited for the prepare
1458 : * to finish in apply_handle_stream_prepare() which will ensure all the
1459 : * operations in that transaction have happened in the subscriber, so no
1460 : * concurrent transaction can cause deadlock or transaction dependency issues.
1461 : */
1462 : static void
1463 5 : apply_handle_rollback_prepared(StringInfo s)
1464 : {
1465 : LogicalRepRollbackPreparedTxnData rollback_data;
1466 : char gid[GIDSIZE];
1467 :
1468 5 : logicalrep_read_rollback_prepared(s, &rollback_data);
1469 5 : set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1470 :
1471 : /* Compute GID for two_phase transactions. */
1472 5 : TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
1473 : gid, sizeof(gid));
1474 :
1475 : /*
1476 : * It is possible that we haven't received prepare because it occurred
1477 : * before walsender reached a consistent point or the two_phase was still
1478 : * not enabled by that time, so in such cases, we need to skip rollback
1479 : * prepared.
1480 : */
1481 5 : if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1482 : rollback_data.prepare_time))
1483 : {
1484 : /*
1485 : * Update origin state so we can restart streaming from correct
1486 : * position in case of crash.
1487 : */
1488 5 : replorigin_xact_state.origin_lsn = rollback_data.rollback_end_lsn;
1489 5 : replorigin_xact_state.origin_timestamp = rollback_data.rollback_time;
1490 :
1491 : /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1492 5 : begin_replication_step();
1493 5 : FinishPreparedTransaction(gid, false);
1494 5 : end_replication_step();
1495 5 : CommitTransactionCommand();
1496 :
1497 5 : clear_subscription_skip_lsn(rollback_data.rollback_end_lsn);
1498 : }
1499 :
1500 5 : pgstat_report_stat(false);
1501 :
1502 : /*
1503 : * It is okay not to set the local_end LSN for the rollback of prepared
1504 : * transaction because we always flush the WAL record for it. See
1505 : * apply_handle_prepare.
1506 : */
1507 5 : store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr);
1508 5 : in_remote_transaction = false;
1509 :
1510 : /*
1511 : * Process any tables that are being synchronized in parallel, as well as
1512 : * any newly added tables or sequences.
1513 : */
1514 5 : ProcessSyncingRelations(rollback_data.rollback_end_lsn);
1515 :
1516 5 : pgstat_report_activity(STATE_IDLE, NULL);
1517 5 : reset_apply_error_context_info();
1518 5 : }
1519 :
1520 : /*
1521 : * Handle STREAM PREPARE.
1522 : */
1523 : static void
1524 15 : apply_handle_stream_prepare(StringInfo s)
1525 : {
1526 : LogicalRepPreparedTxnData prepare_data;
1527 : ParallelApplyWorkerInfo *winfo;
1528 : TransApplyAction apply_action;
1529 :
1530 : /* Save the message before it is consumed. */
1531 15 : StringInfoData original_msg = *s;
1532 :
1533 15 : if (in_streamed_transaction)
1534 0 : ereport(ERROR,
1535 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1536 : errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1537 :
1538 : /* Tablesync should never receive prepare. */
1539 15 : if (am_tablesync_worker())
1540 0 : ereport(ERROR,
1541 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1542 : errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1543 :
1544 15 : logicalrep_read_stream_prepare(s, &prepare_data);
1545 15 : set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1546 :
1547 15 : apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
1548 :
1549 15 : switch (apply_action)
1550 : {
1551 5 : case TRANS_LEADER_APPLY:
1552 :
1553 : /*
1554 : * The transaction has been serialized to file, so replay all the
1555 : * spooled operations.
1556 : */
1557 5 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset,
1558 : prepare_data.xid, prepare_data.prepare_lsn);
1559 :
1560 : /* Mark the transaction as prepared. */
1561 5 : apply_handle_prepare_internal(&prepare_data);
1562 :
1563 5 : CommitTransactionCommand();
1564 :
1565 : /*
1566 : * It is okay not to set the local_end LSN for the prepare because
1567 : * we always flush the prepare record. See apply_handle_prepare.
1568 : */
1569 5 : store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
1570 :
1571 5 : in_remote_transaction = false;
1572 :
1573 : /* Unlink the files with serialized changes and subxact info. */
1574 5 : stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
1575 :
1576 5 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1577 5 : break;
1578 :
1579 4 : case TRANS_LEADER_SEND_TO_PARALLEL:
1580 : Assert(winfo);
1581 :
1582 4 : if (pa_send_data(winfo, s->len, s->data))
1583 : {
1584 : /* Finish processing the streaming transaction. */
1585 4 : pa_xact_finish(winfo, prepare_data.end_lsn);
1586 3 : break;
1587 : }
1588 :
1589 : /*
1590 : * Switch to serialize mode when we are not able to send the
1591 : * change to parallel apply worker.
1592 : */
1593 0 : pa_switch_to_partial_serialize(winfo, true);
1594 :
1595 : pg_fallthrough;
1596 1 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1597 : Assert(winfo);
1598 :
1599 1 : stream_open_and_write_change(prepare_data.xid,
1600 : LOGICAL_REP_MSG_STREAM_PREPARE,
1601 : &original_msg);
1602 :
1603 1 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
1604 :
1605 : /* Finish processing the streaming transaction. */
1606 1 : pa_xact_finish(winfo, prepare_data.end_lsn);
1607 1 : break;
1608 :
1609 5 : case TRANS_PARALLEL_APPLY:
1610 :
1611 : /*
1612 : * If the parallel apply worker is applying spooled messages then
1613 : * close the file before preparing.
1614 : */
1615 5 : if (stream_fd)
1616 1 : stream_close_file();
1617 :
1618 5 : begin_replication_step();
1619 :
1620 : /* Mark the transaction as prepared. */
1621 5 : apply_handle_prepare_internal(&prepare_data);
1622 :
1623 5 : end_replication_step();
1624 :
1625 5 : CommitTransactionCommand();
1626 :
1627 : /*
1628 : * It is okay not to set the local_end LSN for the prepare because
1629 : * we always flush the prepare record. See apply_handle_prepare.
1630 : */
1631 4 : MyParallelShared->last_commit_end = InvalidXLogRecPtr;
1632 :
1633 4 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
1634 4 : pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1635 :
1636 4 : pa_reset_subtrans();
1637 :
1638 4 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1639 4 : break;
1640 :
1641 0 : default:
1642 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1643 : break;
1644 : }
1645 :
1646 13 : pgstat_report_stat(false);
1647 :
1648 : /*
1649 : * Process any tables that are being synchronized in parallel, as well as
1650 : * any newly added tables or sequences.
1651 : */
1652 13 : ProcessSyncingRelations(prepare_data.end_lsn);
1653 :
1654 : /*
1655 : * Similar to prepare case, the subskiplsn could be left in a case of
1656 : * server crash but it's okay. See the comments in apply_handle_prepare().
1657 : */
1658 13 : stop_skipping_changes();
1659 13 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
1660 :
1661 13 : pgstat_report_activity(STATE_IDLE, NULL);
1662 :
1663 13 : reset_apply_error_context_info();
1664 13 : }
1665 :
1666 : /*
1667 : * Handle ORIGIN message.
1668 : *
1669 : * TODO, support tracking of multiple origins
1670 : */
1671 : static void
1672 7 : apply_handle_origin(StringInfo s)
1673 : {
1674 : /*
1675 : * ORIGIN message can only come inside streaming transaction or inside
1676 : * remote transaction and before any actual writes.
1677 : */
1678 7 : if (!in_streamed_transaction &&
1679 10 : (!in_remote_transaction ||
1680 5 : (IsTransactionState() && !am_tablesync_worker())))
1681 0 : ereport(ERROR,
1682 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1683 : errmsg_internal("ORIGIN message sent out of order")));
1684 7 : }
1685 :
1686 : /*
1687 : * Initialize fileset (if not already done).
1688 : *
1689 : * Create a new file when first_segment is true, otherwise open the existing
1690 : * file.
1691 : */
1692 : void
1693 363 : stream_start_internal(TransactionId xid, bool first_segment)
1694 : {
1695 363 : begin_replication_step();
1696 :
1697 : /*
1698 : * Initialize the worker's stream_fileset if we haven't yet. This will be
1699 : * used for the entire duration of the worker so create it in a permanent
1700 : * context. We create this on the very first streaming message from any
1701 : * transaction and then use it for this and other streaming transactions.
1702 : * Now, we could create a fileset at the start of the worker as well but
1703 : * then we won't be sure that it will ever be used.
1704 : */
1705 363 : if (!MyLogicalRepWorker->stream_fileset)
1706 : {
1707 : MemoryContext oldctx;
1708 :
1709 14 : oldctx = MemoryContextSwitchTo(ApplyContext);
1710 :
1711 14 : MyLogicalRepWorker->stream_fileset = palloc_object(FileSet);
1712 14 : FileSetInit(MyLogicalRepWorker->stream_fileset);
1713 :
1714 14 : MemoryContextSwitchTo(oldctx);
1715 : }
1716 :
1717 : /* Open the spool file for this transaction. */
1718 363 : stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
1719 :
1720 : /* If this is not the first segment, open existing subxact file. */
1721 363 : if (!first_segment)
1722 331 : subxact_info_read(MyLogicalRepWorker->subid, xid);
1723 :
1724 363 : end_replication_step();
1725 363 : }
1726 :
1727 : /*
1728 : * Handle STREAM START message.
1729 : */
1730 : static void
1731 843 : apply_handle_stream_start(StringInfo s)
1732 : {
1733 : bool first_segment;
1734 : ParallelApplyWorkerInfo *winfo;
1735 : TransApplyAction apply_action;
1736 :
1737 : /* Save the message before it is consumed. */
1738 843 : StringInfoData original_msg = *s;
1739 :
1740 843 : if (in_streamed_transaction)
1741 0 : ereport(ERROR,
1742 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1743 : errmsg_internal("duplicate STREAM START message")));
1744 :
1745 : /* There must not be an active streaming transaction. */
1746 : Assert(!TransactionIdIsValid(stream_xid));
1747 :
1748 : /* notify handle methods we're processing a remote transaction */
1749 843 : in_streamed_transaction = true;
1750 :
1751 : /* extract XID of the top-level transaction */
1752 843 : stream_xid = logicalrep_read_stream_start(s, &first_segment);
1753 :
1754 843 : if (!TransactionIdIsValid(stream_xid))
1755 0 : ereport(ERROR,
1756 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1757 : errmsg_internal("invalid transaction ID in streamed replication transaction")));
1758 :
1759 843 : set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
1760 :
1761 : /* Try to allocate a worker for the streaming transaction. */
1762 843 : if (first_segment)
1763 86 : pa_allocate_worker(stream_xid);
1764 :
1765 843 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1766 :
1767 843 : switch (apply_action)
1768 : {
1769 343 : case TRANS_LEADER_SERIALIZE:
1770 :
1771 : /*
1772 : * Function stream_start_internal starts a transaction. This
1773 : * transaction will be committed on the stream stop unless it is a
1774 : * tablesync worker in which case it will be committed after
1775 : * processing all the messages. We need this transaction for
1776 : * handling the BufFile, used for serializing the streaming data
1777 : * and subxact info.
1778 : */
1779 343 : stream_start_internal(stream_xid, first_segment);
1780 343 : break;
1781 :
1782 244 : case TRANS_LEADER_SEND_TO_PARALLEL:
1783 : Assert(winfo);
1784 :
1785 : /*
1786 : * Once we start serializing the changes, the parallel apply
1787 : * worker will wait for the leader to release the stream lock
1788 : * until the end of the transaction. So, we don't need to release
1789 : * the lock or increment the stream count in that case.
1790 : */
1791 244 : if (pa_send_data(winfo, s->len, s->data))
1792 : {
1793 : /*
1794 : * Unlock the shared object lock so that the parallel apply
1795 : * worker can continue to receive changes.
1796 : */
1797 240 : if (!first_segment)
1798 215 : pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
1799 :
1800 : /*
1801 : * Increment the number of streaming blocks waiting to be
1802 : * processed by parallel apply worker.
1803 : */
1804 240 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
1805 :
1806 : /* Cache the parallel apply worker for this transaction. */
1807 240 : pa_set_stream_apply_worker(winfo);
1808 240 : break;
1809 : }
1810 :
1811 : /*
1812 : * Switch to serialize mode when we are not able to send the
1813 : * change to parallel apply worker.
1814 : */
1815 4 : pa_switch_to_partial_serialize(winfo, !first_segment);
1816 :
1817 : pg_fallthrough;
1818 15 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1819 : Assert(winfo);
1820 :
1821 : /*
1822 : * Open the spool file unless it was already opened when switching
1823 : * to serialize mode. The transaction started in
1824 : * stream_start_internal will be committed on the stream stop.
1825 : */
1826 15 : if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
1827 11 : stream_start_internal(stream_xid, first_segment);
1828 :
1829 15 : stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg);
1830 :
1831 : /* Cache the parallel apply worker for this transaction. */
1832 15 : pa_set_stream_apply_worker(winfo);
1833 15 : break;
1834 :
1835 245 : case TRANS_PARALLEL_APPLY:
1836 245 : if (first_segment)
1837 : {
1838 : /* Hold the lock until the end of the transaction. */
1839 29 : pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1840 29 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED);
1841 :
1842 : /*
1843 : * Signal the leader apply worker, as it may be waiting for
1844 : * us.
1845 : */
1846 29 : logicalrep_worker_wakeup(WORKERTYPE_APPLY,
1847 29 : MyLogicalRepWorker->subid, InvalidOid);
1848 : }
1849 :
1850 245 : parallel_stream_nchanges = 0;
1851 245 : break;
1852 :
1853 0 : default:
1854 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1855 : break;
1856 : }
1857 :
1858 843 : pgstat_report_activity(STATE_RUNNING, NULL);
1859 843 : }
1860 :
1861 : /*
1862 : * Update the information about subxacts and close the file.
1863 : *
1864 : * This function should be called when the stream_start_internal function has
1865 : * been called.
1866 : */
1867 : void
1868 363 : stream_stop_internal(TransactionId xid)
1869 : {
1870 : /*
1871 : * Serialize information about subxacts for the toplevel transaction, then
1872 : * close the stream messages spool file.
1873 : */
1874 363 : subxact_info_write(MyLogicalRepWorker->subid, xid);
1875 363 : stream_close_file();
1876 :
1877 : /* We must be in a valid transaction state */
1878 : Assert(IsTransactionState());
1879 :
1880 : /* Commit the per-stream transaction */
1881 363 : CommitTransactionCommand();
1882 :
1883 : /* Reset per-stream context */
1884 363 : MemoryContextReset(LogicalStreamingContext);
1885 363 : }
1886 :
1887 : /*
1888 : * Handle STREAM STOP message.
1889 : */
1890 : static void
1891 842 : apply_handle_stream_stop(StringInfo s)
1892 : {
1893 : ParallelApplyWorkerInfo *winfo;
1894 : TransApplyAction apply_action;
1895 :
1896 842 : if (!in_streamed_transaction)
1897 0 : ereport(ERROR,
1898 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1899 : errmsg_internal("STREAM STOP message without STREAM START")));
1900 :
1901 842 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1902 :
1903 842 : switch (apply_action)
1904 : {
1905 343 : case TRANS_LEADER_SERIALIZE:
1906 343 : stream_stop_internal(stream_xid);
1907 343 : break;
1908 :
1909 240 : case TRANS_LEADER_SEND_TO_PARALLEL:
1910 : Assert(winfo);
1911 :
1912 : /*
1913 : * Lock before sending the STREAM_STOP message so that the leader
1914 : * can hold the lock first and the parallel apply worker will wait
1915 : * for leader to release the lock. See Locking Considerations atop
1916 : * applyparallelworker.c.
1917 : */
1918 240 : pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
1919 :
1920 240 : if (pa_send_data(winfo, s->len, s->data))
1921 : {
1922 240 : pa_set_stream_apply_worker(NULL);
1923 240 : break;
1924 : }
1925 :
1926 : /*
1927 : * Switch to serialize mode when we are not able to send the
1928 : * change to parallel apply worker.
1929 : */
1930 0 : pa_switch_to_partial_serialize(winfo, true);
1931 :
1932 : pg_fallthrough;
1933 15 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1934 15 : stream_write_change(LOGICAL_REP_MSG_STREAM_STOP, s);
1935 15 : stream_stop_internal(stream_xid);
1936 15 : pa_set_stream_apply_worker(NULL);
1937 15 : break;
1938 :
1939 244 : case TRANS_PARALLEL_APPLY:
1940 244 : elog(DEBUG1, "applied %u changes in the streaming chunk",
1941 : parallel_stream_nchanges);
1942 :
1943 : /*
1944 : * By the time parallel apply worker is processing the changes in
1945 : * the current streaming block, the leader apply worker may have
1946 : * sent multiple streaming blocks. This can lead to parallel apply
1947 : * worker start waiting even when there are more chunk of streams
1948 : * in the queue. So, try to lock only if there is no message left
1949 : * in the queue. See Locking Considerations atop
1950 : * applyparallelworker.c.
1951 : *
1952 : * Note that here we have a race condition where we can start
1953 : * waiting even when there are pending streaming chunks. This can
1954 : * happen if the leader sends another streaming block and acquires
1955 : * the stream lock again after the parallel apply worker checks
1956 : * that there is no pending streaming block and before it actually
1957 : * starts waiting on a lock. We can handle this case by not
1958 : * allowing the leader to increment the stream block count during
1959 : * the time parallel apply worker acquires the lock but it is not
1960 : * clear whether that is worth the complexity.
1961 : *
1962 : * Now, if this missed chunk contains rollback to savepoint, then
1963 : * there is a risk of deadlock which probably shouldn't happen
1964 : * after restart.
1965 : */
1966 244 : pa_decr_and_wait_stream_block();
1967 242 : break;
1968 :
1969 0 : default:
1970 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1971 : break;
1972 : }
1973 :
1974 840 : in_streamed_transaction = false;
1975 840 : stream_xid = InvalidTransactionId;
1976 :
1977 : /*
1978 : * The parallel apply worker could be in a transaction in which case we
1979 : * need to report the state as STATE_IDLEINTRANSACTION.
1980 : */
1981 840 : if (IsTransactionOrTransactionBlock())
1982 242 : pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL);
1983 : else
1984 598 : pgstat_report_activity(STATE_IDLE, NULL);
1985 :
1986 840 : reset_apply_error_context_info();
1987 840 : }
1988 :
1989 : /*
1990 : * Helper function to handle STREAM ABORT message when the transaction was
1991 : * serialized to file.
1992 : */
1993 : static void
1994 14 : stream_abort_internal(TransactionId xid, TransactionId subxid)
1995 : {
1996 : /*
1997 : * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1998 : * just delete the files with serialized info.
1999 : */
2000 14 : if (xid == subxid)
2001 1 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
2002 : else
2003 : {
2004 : /*
2005 : * OK, so it's a subxact. We need to read the subxact file for the
2006 : * toplevel transaction, determine the offset tracked for the subxact,
2007 : * and truncate the file with changes. We also remove the subxacts
2008 : * with higher offsets (or rather higher XIDs).
2009 : *
2010 : * We intentionally scan the array from the tail, because we're likely
2011 : * aborting a change for the most recent subtransactions.
2012 : *
2013 : * We can't use the binary search here as subxact XIDs won't
2014 : * necessarily arrive in sorted order, consider the case where we have
2015 : * released the savepoint for multiple subtransactions and then
2016 : * performed rollback to savepoint for one of the earlier
2017 : * sub-transaction.
2018 : */
2019 : int64 i;
2020 : int64 subidx;
2021 : BufFile *fd;
2022 13 : bool found = false;
2023 : char path[MAXPGPATH];
2024 :
2025 13 : subidx = -1;
2026 13 : begin_replication_step();
2027 13 : subxact_info_read(MyLogicalRepWorker->subid, xid);
2028 :
2029 15 : for (i = subxact_data.nsubxacts; i > 0; i--)
2030 : {
2031 11 : if (subxact_data.subxacts[i - 1].xid == subxid)
2032 : {
2033 9 : subidx = (i - 1);
2034 9 : found = true;
2035 9 : break;
2036 : }
2037 : }
2038 :
2039 : /*
2040 : * If it's an empty sub-transaction then we will not find the subxid
2041 : * here so just cleanup the subxact info and return.
2042 : */
2043 13 : if (!found)
2044 : {
2045 : /* Cleanup the subxact info */
2046 4 : cleanup_subxact_info();
2047 4 : end_replication_step();
2048 4 : CommitTransactionCommand();
2049 4 : return;
2050 : }
2051 :
2052 : /* open the changes file */
2053 9 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2054 9 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path,
2055 : O_RDWR, false);
2056 :
2057 : /* OK, truncate the file at the right offset */
2058 9 : BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
2059 9 : subxact_data.subxacts[subidx].offset);
2060 9 : BufFileClose(fd);
2061 :
2062 : /* discard the subxacts added later */
2063 9 : subxact_data.nsubxacts = subidx;
2064 :
2065 : /* write the updated subxact list */
2066 9 : subxact_info_write(MyLogicalRepWorker->subid, xid);
2067 :
2068 9 : end_replication_step();
2069 9 : CommitTransactionCommand();
2070 : }
2071 : }
2072 :
2073 : /*
2074 : * Handle STREAM ABORT message.
2075 : */
2076 : static void
2077 38 : apply_handle_stream_abort(StringInfo s)
2078 : {
2079 : TransactionId xid;
2080 : TransactionId subxid;
2081 : LogicalRepStreamAbortData abort_data;
2082 : ParallelApplyWorkerInfo *winfo;
2083 : TransApplyAction apply_action;
2084 :
2085 : /* Save the message before it is consumed. */
2086 38 : StringInfoData original_msg = *s;
2087 : bool toplevel_xact;
2088 :
2089 38 : if (in_streamed_transaction)
2090 0 : ereport(ERROR,
2091 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2092 : errmsg_internal("STREAM ABORT message without STREAM STOP")));
2093 :
2094 : /* We receive abort information only when we can apply in parallel. */
2095 38 : logicalrep_read_stream_abort(s, &abort_data,
2096 38 : MyLogicalRepWorker->parallel_apply);
2097 :
2098 38 : xid = abort_data.xid;
2099 38 : subxid = abort_data.subxid;
2100 38 : toplevel_xact = (xid == subxid);
2101 :
2102 38 : set_apply_error_context_xact(subxid, abort_data.abort_lsn);
2103 :
2104 38 : apply_action = get_transaction_apply_action(xid, &winfo);
2105 :
2106 38 : switch (apply_action)
2107 : {
2108 14 : case TRANS_LEADER_APPLY:
2109 :
2110 : /*
2111 : * We are in the leader apply worker and the transaction has been
2112 : * serialized to file.
2113 : */
2114 14 : stream_abort_internal(xid, subxid);
2115 :
2116 14 : elog(DEBUG1, "finished processing the STREAM ABORT command");
2117 14 : break;
2118 :
2119 10 : case TRANS_LEADER_SEND_TO_PARALLEL:
2120 : Assert(winfo);
2121 :
2122 : /*
2123 : * For the case of aborting the subtransaction, we increment the
2124 : * number of streaming blocks and take the lock again before
2125 : * sending the STREAM_ABORT to ensure that the parallel apply
2126 : * worker will wait on the lock for the next set of changes after
2127 : * processing the STREAM_ABORT message if it is not already
2128 : * waiting for STREAM_STOP message.
2129 : *
2130 : * It is important to perform this locking before sending the
2131 : * STREAM_ABORT message so that the leader can hold the lock first
2132 : * and the parallel apply worker will wait for the leader to
2133 : * release the lock. This is the same as what we do in
2134 : * apply_handle_stream_stop. See Locking Considerations atop
2135 : * applyparallelworker.c.
2136 : */
2137 10 : if (!toplevel_xact)
2138 : {
2139 9 : pa_unlock_stream(xid, AccessExclusiveLock);
2140 9 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
2141 9 : pa_lock_stream(xid, AccessExclusiveLock);
2142 : }
2143 :
2144 10 : if (pa_send_data(winfo, s->len, s->data))
2145 : {
2146 : /*
2147 : * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
2148 : * wait here for the parallel apply worker to finish as that
2149 : * is not required to maintain the commit order and won't have
2150 : * the risk of failures due to transaction dependencies and
2151 : * deadlocks. However, it is possible that before the parallel
2152 : * worker finishes and we clear the worker info, the xid
2153 : * wraparound happens on the upstream and a new transaction
2154 : * with the same xid can appear and that can lead to duplicate
2155 : * entries in ParallelApplyTxnHash. Yet another problem could
2156 : * be that we may have serialized the changes in partial
2157 : * serialize mode and the file containing xact changes may
2158 : * already exist, and after xid wraparound trying to create
2159 : * the file for the same xid can lead to an error. To avoid
2160 : * these problems, we decide to wait for the aborts to finish.
2161 : *
2162 : * Note, it is okay to not update the flush location position
2163 : * for aborts as in worst case that means such a transaction
2164 : * won't be sent again after restart.
2165 : */
2166 10 : if (toplevel_xact)
2167 1 : pa_xact_finish(winfo, InvalidXLogRecPtr);
2168 :
2169 10 : break;
2170 : }
2171 :
2172 : /*
2173 : * Switch to serialize mode when we are not able to send the
2174 : * change to parallel apply worker.
2175 : */
2176 0 : pa_switch_to_partial_serialize(winfo, true);
2177 :
2178 : pg_fallthrough;
2179 2 : case TRANS_LEADER_PARTIAL_SERIALIZE:
2180 : Assert(winfo);
2181 :
2182 : /*
2183 : * Parallel apply worker might have applied some changes, so write
2184 : * the STREAM_ABORT message so that it can rollback the
2185 : * subtransaction if needed.
2186 : */
2187 2 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_ABORT,
2188 : &original_msg);
2189 :
2190 2 : if (toplevel_xact)
2191 : {
2192 1 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
2193 1 : pa_xact_finish(winfo, InvalidXLogRecPtr);
2194 : }
2195 2 : break;
2196 :
2197 12 : case TRANS_PARALLEL_APPLY:
2198 :
2199 : /*
2200 : * If the parallel apply worker is applying spooled messages then
2201 : * close the file before aborting.
2202 : */
2203 12 : if (toplevel_xact && stream_fd)
2204 1 : stream_close_file();
2205 :
2206 12 : pa_stream_abort(&abort_data);
2207 :
2208 : /*
2209 : * We need to wait after processing rollback to savepoint for the
2210 : * next set of changes.
2211 : *
2212 : * We have a race condition here due to which we can start waiting
2213 : * here when there are more chunk of streams in the queue. See
2214 : * apply_handle_stream_stop.
2215 : */
2216 12 : if (!toplevel_xact)
2217 10 : pa_decr_and_wait_stream_block();
2218 :
2219 12 : elog(DEBUG1, "finished processing the STREAM ABORT command");
2220 12 : break;
2221 :
2222 0 : default:
2223 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2224 : break;
2225 : }
2226 :
2227 38 : reset_apply_error_context_info();
2228 38 : }
2229 :
2230 : /*
2231 : * Ensure that the passed location is fileset's end.
2232 : */
2233 : static void
2234 4 : ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
2235 : pgoff_t offset)
2236 : {
2237 : char path[MAXPGPATH];
2238 : BufFile *fd;
2239 : int last_fileno;
2240 : pgoff_t last_offset;
2241 :
2242 : Assert(!IsTransactionState());
2243 :
2244 4 : begin_replication_step();
2245 :
2246 4 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2247 :
2248 4 : fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2249 :
2250 4 : BufFileSeek(fd, 0, 0, SEEK_END);
2251 4 : BufFileTell(fd, &last_fileno, &last_offset);
2252 :
2253 4 : BufFileClose(fd);
2254 :
2255 4 : end_replication_step();
2256 :
2257 4 : if (last_fileno != fileno || last_offset != offset)
2258 0 : elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2259 : path);
2260 4 : }
2261 :
2262 : /*
2263 : * Common spoolfile processing.
2264 : */
2265 : void
2266 31 : apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
2267 : XLogRecPtr lsn)
2268 : {
2269 : int nchanges;
2270 : char path[MAXPGPATH];
2271 31 : char *buffer = NULL;
2272 : MemoryContext oldcxt;
2273 : ResourceOwner oldowner;
2274 : int fileno;
2275 : pgoff_t offset;
2276 :
2277 31 : if (!am_parallel_apply_worker())
2278 27 : maybe_start_skipping_changes(lsn);
2279 :
2280 : /* Make sure we have an open transaction */
2281 31 : begin_replication_step();
2282 :
2283 : /*
2284 : * Allocate file handle and memory required to process all the messages in
2285 : * TopTransactionContext to avoid them getting reset after each message is
2286 : * processed.
2287 : */
2288 31 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
2289 :
2290 : /* Open the spool file for the committed/prepared transaction */
2291 31 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2292 31 : elog(DEBUG1, "replaying changes from file \"%s\"", path);
2293 :
2294 : /*
2295 : * Make sure the file is owned by the toplevel transaction so that the
2296 : * file will not be accidentally closed when aborting a subtransaction.
2297 : */
2298 31 : oldowner = CurrentResourceOwner;
2299 31 : CurrentResourceOwner = TopTransactionResourceOwner;
2300 :
2301 31 : stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2302 :
2303 31 : CurrentResourceOwner = oldowner;
2304 :
2305 31 : buffer = palloc(BLCKSZ);
2306 :
2307 31 : MemoryContextSwitchTo(oldcxt);
2308 :
2309 31 : remote_final_lsn = lsn;
2310 :
2311 : /*
2312 : * Make sure the handle apply_dispatch methods are aware we're in a remote
2313 : * transaction.
2314 : */
2315 31 : in_remote_transaction = true;
2316 31 : pgstat_report_activity(STATE_RUNNING, NULL);
2317 :
2318 31 : end_replication_step();
2319 :
2320 : /*
2321 : * Read the entries one by one and pass them through the same logic as in
2322 : * apply_dispatch.
2323 : */
2324 31 : nchanges = 0;
2325 : while (true)
2326 88469 : {
2327 : StringInfoData s2;
2328 : size_t nbytes;
2329 : int len;
2330 :
2331 88500 : CHECK_FOR_INTERRUPTS();
2332 :
2333 : /* read length of the on-disk record */
2334 88500 : nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2335 :
2336 : /* have we reached end of the file? */
2337 88500 : if (nbytes == 0)
2338 26 : break;
2339 :
2340 : /* do we have a correct length? */
2341 88474 : if (len <= 0)
2342 0 : elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2343 : len, path);
2344 :
2345 : /* make sure we have sufficiently large buffer */
2346 88474 : buffer = repalloc(buffer, len);
2347 :
2348 : /* and finally read the data into the buffer */
2349 88474 : BufFileReadExact(stream_fd, buffer, len);
2350 :
2351 88474 : BufFileTell(stream_fd, &fileno, &offset);
2352 :
2353 : /* init a stringinfo using the buffer and call apply_dispatch */
2354 88474 : initReadOnlyStringInfo(&s2, buffer, len);
2355 :
2356 : /* Ensure we are reading the data into our memory context. */
2357 88474 : oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
2358 :
2359 88474 : apply_dispatch(&s2);
2360 :
2361 88473 : MemoryContextReset(ApplyMessageContext);
2362 :
2363 88473 : MemoryContextSwitchTo(oldcxt);
2364 :
2365 88473 : nchanges++;
2366 :
2367 : /*
2368 : * It is possible the file has been closed because we have processed
2369 : * the transaction end message like stream_commit in which case that
2370 : * must be the last message.
2371 : */
2372 88473 : if (!stream_fd)
2373 : {
2374 4 : ensure_last_message(stream_fileset, xid, fileno, offset);
2375 4 : break;
2376 : }
2377 :
2378 88469 : if (nchanges % 1000 == 0)
2379 83 : elog(DEBUG1, "replayed %d changes from file \"%s\"",
2380 : nchanges, path);
2381 : }
2382 :
2383 30 : if (stream_fd)
2384 26 : stream_close_file();
2385 :
2386 30 : elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2387 : nchanges, path);
2388 :
2389 30 : return;
2390 : }
2391 :
2392 : /*
2393 : * Handle STREAM COMMIT message.
2394 : */
2395 : static void
2396 61 : apply_handle_stream_commit(StringInfo s)
2397 : {
2398 : TransactionId xid;
2399 : LogicalRepCommitData commit_data;
2400 : ParallelApplyWorkerInfo *winfo;
2401 : TransApplyAction apply_action;
2402 :
2403 : /* Save the message before it is consumed. */
2404 61 : StringInfoData original_msg = *s;
2405 :
2406 61 : if (in_streamed_transaction)
2407 0 : ereport(ERROR,
2408 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2409 : errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2410 :
2411 61 : xid = logicalrep_read_stream_commit(s, &commit_data);
2412 61 : set_apply_error_context_xact(xid, commit_data.commit_lsn);
2413 :
2414 61 : apply_action = get_transaction_apply_action(xid, &winfo);
2415 :
2416 61 : switch (apply_action)
2417 : {
2418 22 : case TRANS_LEADER_APPLY:
2419 :
2420 : /*
2421 : * The transaction has been serialized to file, so replay all the
2422 : * spooled operations.
2423 : */
2424 22 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid,
2425 : commit_data.commit_lsn);
2426 :
2427 21 : apply_handle_commit_internal(&commit_data);
2428 :
2429 : /* Unlink the files with serialized changes and subxact info. */
2430 21 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
2431 :
2432 21 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2433 21 : break;
2434 :
2435 18 : case TRANS_LEADER_SEND_TO_PARALLEL:
2436 : Assert(winfo);
2437 :
2438 18 : if (pa_send_data(winfo, s->len, s->data))
2439 : {
2440 : /* Finish processing the streaming transaction. */
2441 18 : pa_xact_finish(winfo, commit_data.end_lsn);
2442 17 : break;
2443 : }
2444 :
2445 : /*
2446 : * Switch to serialize mode when we are not able to send the
2447 : * change to parallel apply worker.
2448 : */
2449 0 : pa_switch_to_partial_serialize(winfo, true);
2450 :
2451 : pg_fallthrough;
2452 2 : case TRANS_LEADER_PARTIAL_SERIALIZE:
2453 : Assert(winfo);
2454 :
2455 2 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_COMMIT,
2456 : &original_msg);
2457 :
2458 2 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
2459 :
2460 : /* Finish processing the streaming transaction. */
2461 2 : pa_xact_finish(winfo, commit_data.end_lsn);
2462 2 : break;
2463 :
2464 19 : case TRANS_PARALLEL_APPLY:
2465 :
2466 : /*
2467 : * If the parallel apply worker is applying spooled messages then
2468 : * close the file before committing.
2469 : */
2470 19 : if (stream_fd)
2471 2 : stream_close_file();
2472 :
2473 19 : apply_handle_commit_internal(&commit_data);
2474 :
2475 19 : MyParallelShared->last_commit_end = XactLastCommitEnd;
2476 :
2477 : /*
2478 : * It is important to set the transaction state as finished before
2479 : * releasing the lock. See pa_wait_for_xact_finish.
2480 : */
2481 19 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
2482 19 : pa_unlock_transaction(xid, AccessExclusiveLock);
2483 :
2484 19 : pa_reset_subtrans();
2485 :
2486 19 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2487 19 : break;
2488 :
2489 0 : default:
2490 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2491 : break;
2492 : }
2493 :
2494 : /*
2495 : * Process any tables that are being synchronized in parallel, as well as
2496 : * any newly added tables or sequences.
2497 : */
2498 59 : ProcessSyncingRelations(commit_data.end_lsn);
2499 :
2500 59 : pgstat_report_activity(STATE_IDLE, NULL);
2501 :
2502 59 : reset_apply_error_context_info();
2503 59 : }
2504 :
2505 : /*
2506 : * Helper function for apply_handle_commit and apply_handle_stream_commit.
2507 : */
2508 : static void
2509 490 : apply_handle_commit_internal(LogicalRepCommitData *commit_data)
2510 : {
2511 490 : if (is_skipping_changes())
2512 : {
2513 2 : stop_skipping_changes();
2514 :
2515 : /*
2516 : * Start a new transaction to clear the subskiplsn, if not started
2517 : * yet.
2518 : */
2519 2 : if (!IsTransactionState())
2520 1 : StartTransactionCommand();
2521 : }
2522 :
2523 490 : if (IsTransactionState())
2524 : {
2525 : /*
2526 : * The transaction is either non-empty or skipped, so we clear the
2527 : * subskiplsn.
2528 : */
2529 490 : clear_subscription_skip_lsn(commit_data->commit_lsn);
2530 :
2531 : /*
2532 : * Update origin state so we can restart streaming from correct
2533 : * position in case of crash.
2534 : */
2535 490 : replorigin_xact_state.origin_lsn = commit_data->end_lsn;
2536 490 : replorigin_xact_state.origin_timestamp = commit_data->committime;
2537 :
2538 490 : CommitTransactionCommand();
2539 :
2540 490 : if (IsTransactionBlock())
2541 : {
2542 4 : EndTransactionBlock(false);
2543 4 : CommitTransactionCommand();
2544 : }
2545 :
2546 490 : pgstat_report_stat(false);
2547 :
2548 490 : store_flush_position(commit_data->end_lsn, XactLastCommitEnd);
2549 : }
2550 : else
2551 : {
2552 : /* Process any invalidation messages that might have accumulated. */
2553 0 : AcceptInvalidationMessages();
2554 0 : maybe_reread_subscription();
2555 : }
2556 :
2557 490 : in_remote_transaction = false;
2558 490 : }
2559 :
2560 : /*
2561 : * Handle RELATION message.
2562 : *
2563 : * Note we don't do validation against local schema here. The validation
2564 : * against local schema is postponed until first change for given relation
2565 : * comes as we only care about it when applying changes for it anyway and we
2566 : * do less locking this way.
2567 : */
2568 : static void
2569 491 : apply_handle_relation(StringInfo s)
2570 : {
2571 : LogicalRepRelation *rel;
2572 :
2573 491 : if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
2574 35 : return;
2575 :
2576 456 : rel = logicalrep_read_rel(s);
2577 456 : logicalrep_relmap_update(rel);
2578 :
2579 : /* Also reset all entries in the partition map that refer to remoterel. */
2580 456 : logicalrep_partmap_reset_relmap(rel);
2581 : }
2582 :
2583 : /*
2584 : * Handle TYPE message.
2585 : *
2586 : * This implementation pays no attention to TYPE messages; we expect the user
2587 : * to have set things up so that the incoming data is acceptable to the input
2588 : * functions for the locally subscribed tables. Hence, we just read and
2589 : * discard the message.
2590 : */
2591 : static void
2592 18 : apply_handle_type(StringInfo s)
2593 : {
2594 : LogicalRepTyp typ;
2595 :
2596 18 : if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s))
2597 0 : return;
2598 :
2599 18 : logicalrep_read_typ(s, &typ);
2600 : }
2601 :
2602 : /*
2603 : * Check that we (the subscription owner) have sufficient privileges on the
2604 : * target relation to perform the given operation.
2605 : */
2606 : static void
2607 220602 : TargetPrivilegesCheck(Relation rel, AclMode mode)
2608 : {
2609 : Oid relid;
2610 : AclResult aclresult;
2611 :
2612 220602 : relid = RelationGetRelid(rel);
2613 220602 : aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
2614 220602 : if (aclresult != ACLCHECK_OK)
2615 9 : aclcheck_error(aclresult,
2616 9 : get_relkind_objtype(rel->rd_rel->relkind),
2617 9 : get_rel_name(relid));
2618 :
2619 : /*
2620 : * We lack the infrastructure to honor RLS policies. It might be possible
2621 : * to add such infrastructure here, but tablesync workers lack it, too, so
2622 : * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2623 : * but it seems dangerous to replicate a TRUNCATE and then refuse to
2624 : * replicate subsequent INSERTs, so we forbid all commands the same.
2625 : */
2626 220593 : if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2627 4 : ereport(ERROR,
2628 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2629 : errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2630 : GetUserNameFromId(GetUserId(), true),
2631 : RelationGetRelationName(rel))));
2632 220589 : }
2633 :
2634 : /*
2635 : * Handle INSERT message.
2636 : */
2637 :
2638 : static void
2639 186119 : apply_handle_insert(StringInfo s)
2640 : {
2641 : LogicalRepRelMapEntry *rel;
2642 : LogicalRepTupleData newtup;
2643 : LogicalRepRelId relid;
2644 : UserContext ucxt;
2645 : ApplyExecutionData *edata;
2646 : EState *estate;
2647 : TupleTableSlot *remoteslot;
2648 : MemoryContext oldctx;
2649 : bool run_as_owner;
2650 :
2651 : /*
2652 : * Quick return if we are skipping data modification changes or handling
2653 : * streamed transactions.
2654 : */
2655 362237 : if (is_skipping_changes() ||
2656 176118 : handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
2657 110047 : return;
2658 :
2659 76110 : begin_replication_step();
2660 :
2661 76108 : relid = logicalrep_read_insert(s, &newtup);
2662 76108 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2663 76098 : if (!should_apply_changes_for_rel(rel))
2664 : {
2665 : /*
2666 : * The relation can't become interesting in the middle of the
2667 : * transaction so it's safe to unlock it.
2668 : */
2669 38 : logicalrep_rel_close(rel, RowExclusiveLock);
2670 38 : end_replication_step();
2671 38 : return;
2672 : }
2673 :
2674 : /*
2675 : * Make sure that any user-supplied code runs as the table owner, unless
2676 : * the user has opted out of that behavior.
2677 : */
2678 76060 : run_as_owner = MySubscription->runasowner;
2679 76060 : if (!run_as_owner)
2680 76052 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2681 :
2682 : /* Set relation for error callback */
2683 76060 : apply_error_callback_arg.rel = rel;
2684 :
2685 : /* Initialize the executor state. */
2686 76060 : edata = create_edata_for_relation(rel);
2687 76060 : estate = edata->estate;
2688 76060 : remoteslot = ExecInitExtraTupleSlot(estate,
2689 76060 : RelationGetDescr(rel->localrel),
2690 : &TTSOpsVirtual);
2691 :
2692 : /* Process and store remote tuple in the slot */
2693 76060 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2694 76060 : slot_store_data(remoteslot, rel, &newtup);
2695 76060 : slot_fill_defaults(rel, estate, remoteslot);
2696 76060 : MemoryContextSwitchTo(oldctx);
2697 :
2698 : /* For a partitioned table, insert the tuple into a partition. */
2699 76060 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2700 60 : apply_handle_tuple_routing(edata,
2701 : remoteslot, NULL, CMD_INSERT);
2702 : else
2703 : {
2704 76000 : ResultRelInfo *relinfo = edata->targetRelInfo;
2705 :
2706 76000 : ExecOpenIndices(relinfo, false);
2707 76000 : apply_handle_insert_internal(edata, relinfo, remoteslot);
2708 75984 : ExecCloseIndices(relinfo);
2709 : }
2710 :
2711 76028 : finish_edata(edata);
2712 :
2713 : /* Reset relation for error callback */
2714 76028 : apply_error_callback_arg.rel = NULL;
2715 :
2716 76028 : if (!run_as_owner)
2717 76023 : RestoreUserContext(&ucxt);
2718 :
2719 76028 : logicalrep_rel_close(rel, NoLock);
2720 :
2721 76028 : end_replication_step();
2722 : }
2723 :
2724 : /*
2725 : * Workhorse for apply_handle_insert()
2726 : * relinfo is for the relation we're actually inserting into
2727 : * (could be a child partition of edata->targetRelInfo)
2728 : */
2729 : static void
2730 76061 : apply_handle_insert_internal(ApplyExecutionData *edata,
2731 : ResultRelInfo *relinfo,
2732 : TupleTableSlot *remoteslot)
2733 : {
2734 76061 : EState *estate = edata->estate;
2735 :
2736 : /* Caller should have opened indexes already. */
2737 : Assert(relinfo->ri_IndexRelationDescs != NULL ||
2738 : !relinfo->ri_RelationDesc->rd_rel->relhasindex ||
2739 : RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
2740 :
2741 : /* Caller will not have done this bit. */
2742 : Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
2743 76061 : InitConflictIndexes(relinfo);
2744 :
2745 : /* Do the insert. */
2746 76061 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
2747 76054 : ExecSimpleRelationInsert(relinfo, estate, remoteslot);
2748 76029 : }
2749 :
2750 : /*
2751 : * Check if the logical replication relation is updatable and throw
2752 : * appropriate error if it isn't.
2753 : */
2754 : static void
2755 72300 : check_relation_updatable(LogicalRepRelMapEntry *rel)
2756 : {
2757 : /*
2758 : * For partitioned tables, we only need to care if the target partition is
2759 : * updatable (aka has PK or RI defined for it).
2760 : */
2761 72300 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2762 30 : return;
2763 :
2764 : /* Updatable, no error. */
2765 72270 : if (rel->updatable)
2766 72270 : return;
2767 :
2768 : /*
2769 : * We are in error mode so it's fine this is somewhat slow. It's better to
2770 : * give user correct error.
2771 : */
2772 0 : if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
2773 : {
2774 0 : ereport(ERROR,
2775 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2776 : errmsg("publisher did not send replica identity column "
2777 : "expected by the logical replication target relation \"%s.%s\"",
2778 : rel->remoterel.nspname, rel->remoterel.relname)));
2779 : }
2780 :
2781 0 : ereport(ERROR,
2782 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2783 : errmsg("logical replication target relation \"%s.%s\" has "
2784 : "neither REPLICA IDENTITY index nor PRIMARY "
2785 : "KEY and published relation does not have "
2786 : "REPLICA IDENTITY FULL",
2787 : rel->remoterel.nspname, rel->remoterel.relname)));
2788 : }
2789 :
2790 : /*
2791 : * Handle UPDATE message.
2792 : *
2793 : * TODO: FDW support
2794 : */
2795 : static void
2796 66173 : apply_handle_update(StringInfo s)
2797 : {
2798 : LogicalRepRelMapEntry *rel;
2799 : LogicalRepRelId relid;
2800 : UserContext ucxt;
2801 : ApplyExecutionData *edata;
2802 : EState *estate;
2803 : LogicalRepTupleData oldtup;
2804 : LogicalRepTupleData newtup;
2805 : bool has_oldtup;
2806 : TupleTableSlot *remoteslot;
2807 : RTEPermissionInfo *target_perminfo;
2808 : MemoryContext oldctx;
2809 : bool run_as_owner;
2810 :
2811 : /*
2812 : * Quick return if we are skipping data modification changes or handling
2813 : * streamed transactions.
2814 : */
2815 132343 : if (is_skipping_changes() ||
2816 66170 : handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
2817 34223 : return;
2818 :
2819 31950 : begin_replication_step();
2820 :
2821 31949 : relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
2822 : &newtup);
2823 31949 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2824 31949 : if (!should_apply_changes_for_rel(rel))
2825 : {
2826 : /*
2827 : * The relation can't become interesting in the middle of the
2828 : * transaction so it's safe to unlock it.
2829 : */
2830 0 : logicalrep_rel_close(rel, RowExclusiveLock);
2831 0 : end_replication_step();
2832 0 : return;
2833 : }
2834 :
2835 : /* Set relation for error callback */
2836 31949 : apply_error_callback_arg.rel = rel;
2837 :
2838 : /* Check if we can do the update. */
2839 31949 : check_relation_updatable(rel);
2840 :
2841 : /*
2842 : * Make sure that any user-supplied code runs as the table owner, unless
2843 : * the user has opted out of that behavior.
2844 : */
2845 31949 : run_as_owner = MySubscription->runasowner;
2846 31949 : if (!run_as_owner)
2847 31946 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2848 :
2849 : /* Initialize the executor state. */
2850 31948 : edata = create_edata_for_relation(rel);
2851 31948 : estate = edata->estate;
2852 31948 : remoteslot = ExecInitExtraTupleSlot(estate,
2853 31948 : RelationGetDescr(rel->localrel),
2854 : &TTSOpsVirtual);
2855 :
2856 : /*
2857 : * Populate updatedCols so that per-column triggers can fire, and so
2858 : * executor can correctly pass down indexUnchanged hint. This could
2859 : * include more columns than were actually changed on the publisher
2860 : * because the logical replication protocol doesn't contain that
2861 : * information. But it would for example exclude columns that only exist
2862 : * on the subscriber, since we are not touching those.
2863 : */
2864 31948 : target_perminfo = list_nth(estate->es_rteperminfos, 0);
2865 159353 : for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2866 : {
2867 127405 : CompactAttribute *att = TupleDescCompactAttr(remoteslot->tts_tupleDescriptor, i);
2868 127405 : int remoteattnum = rel->attrmap->attnums[i];
2869 :
2870 127405 : if (!att->attisdropped && remoteattnum >= 0)
2871 : {
2872 : Assert(remoteattnum < newtup.ncols);
2873 68888 : if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
2874 68888 : target_perminfo->updatedCols =
2875 68888 : bms_add_member(target_perminfo->updatedCols,
2876 : i + 1 - FirstLowInvalidHeapAttributeNumber);
2877 : }
2878 : }
2879 :
2880 : /* Build the search tuple. */
2881 31948 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2882 31948 : slot_store_data(remoteslot, rel,
2883 31948 : has_oldtup ? &oldtup : &newtup);
2884 31948 : MemoryContextSwitchTo(oldctx);
2885 :
2886 : /* For a partitioned table, apply update to correct partition. */
2887 31948 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2888 13 : apply_handle_tuple_routing(edata,
2889 : remoteslot, &newtup, CMD_UPDATE);
2890 : else
2891 31935 : apply_handle_update_internal(edata, edata->targetRelInfo,
2892 : remoteslot, &newtup, rel->localindexoid);
2893 :
2894 31940 : finish_edata(edata);
2895 :
2896 : /* Reset relation for error callback */
2897 31940 : apply_error_callback_arg.rel = NULL;
2898 :
2899 31940 : if (!run_as_owner)
2900 31938 : RestoreUserContext(&ucxt);
2901 :
2902 31940 : logicalrep_rel_close(rel, NoLock);
2903 :
2904 31940 : end_replication_step();
2905 : }
2906 :
2907 : /*
2908 : * Workhorse for apply_handle_update()
2909 : * relinfo is for the relation we're actually updating in
2910 : * (could be a child partition of edata->targetRelInfo)
2911 : */
2912 : static void
2913 31935 : apply_handle_update_internal(ApplyExecutionData *edata,
2914 : ResultRelInfo *relinfo,
2915 : TupleTableSlot *remoteslot,
2916 : LogicalRepTupleData *newtup,
2917 : Oid localindexoid)
2918 : {
2919 31935 : EState *estate = edata->estate;
2920 31935 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2921 31935 : Relation localrel = relinfo->ri_RelationDesc;
2922 : EPQState epqstate;
2923 31935 : TupleTableSlot *localslot = NULL;
2924 31935 : ConflictTupleInfo conflicttuple = {0};
2925 : bool found;
2926 : MemoryContext oldctx;
2927 :
2928 31935 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2929 31935 : ExecOpenIndices(relinfo, false);
2930 :
2931 31935 : found = FindReplTupleInLocalRel(edata, localrel,
2932 : &relmapentry->remoterel,
2933 : localindexoid,
2934 : remoteslot, &localslot);
2935 :
2936 : /*
2937 : * Tuple found.
2938 : *
2939 : * Note this will fail if there are other conflicting unique indexes.
2940 : */
2941 31929 : if (found)
2942 : {
2943 : /*
2944 : * Report the conflict if the tuple was modified by a different
2945 : * origin.
2946 : */
2947 31913 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
2948 2 : &conflicttuple.origin, &conflicttuple.ts) &&
2949 2 : conflicttuple.origin != replorigin_xact_state.origin)
2950 : {
2951 : TupleTableSlot *newslot;
2952 :
2953 : /* Store the new tuple for conflict reporting */
2954 2 : newslot = table_slot_create(localrel, &estate->es_tupleTable);
2955 2 : slot_store_data(newslot, relmapentry, newtup);
2956 :
2957 2 : conflicttuple.slot = localslot;
2958 :
2959 2 : ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
2960 : remoteslot, newslot,
2961 2 : list_make1(&conflicttuple));
2962 : }
2963 :
2964 : /* Process and store remote tuple in the slot */
2965 31913 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2966 31913 : slot_modify_data(remoteslot, localslot, relmapentry, newtup);
2967 31913 : MemoryContextSwitchTo(oldctx);
2968 :
2969 31913 : EvalPlanQualSetSlot(&epqstate, remoteslot);
2970 :
2971 31913 : InitConflictIndexes(relinfo);
2972 :
2973 : /* Do the actual update. */
2974 31913 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
2975 31913 : ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2976 : remoteslot);
2977 : }
2978 : else
2979 : {
2980 : ConflictType type;
2981 16 : TupleTableSlot *newslot = localslot;
2982 :
2983 : /*
2984 : * Detecting whether the tuple was recently deleted or never existed
2985 : * is crucial to avoid misleading the user during conflict handling.
2986 : */
2987 16 : if (FindDeletedTupleInLocalRel(localrel, localindexoid, remoteslot,
2988 : &conflicttuple.xmin,
2989 : &conflicttuple.origin,
2990 3 : &conflicttuple.ts) &&
2991 3 : conflicttuple.origin != replorigin_xact_state.origin)
2992 3 : type = CT_UPDATE_DELETED;
2993 : else
2994 13 : type = CT_UPDATE_MISSING;
2995 :
2996 : /* Store the new tuple for conflict reporting */
2997 16 : slot_store_data(newslot, relmapentry, newtup);
2998 :
2999 : /*
3000 : * The tuple to be updated could not be found or was deleted. Do
3001 : * nothing except for emitting a log message.
3002 : */
3003 16 : ReportApplyConflict(estate, relinfo, LOG, type, remoteslot, newslot,
3004 16 : list_make1(&conflicttuple));
3005 : }
3006 :
3007 : /* Cleanup. */
3008 31927 : ExecCloseIndices(relinfo);
3009 31927 : EvalPlanQualEnd(&epqstate);
3010 31927 : }
3011 :
3012 : /*
3013 : * Handle DELETE message.
3014 : *
3015 : * TODO: FDW support
3016 : */
3017 : static void
3018 81936 : apply_handle_delete(StringInfo s)
3019 : {
3020 : LogicalRepRelMapEntry *rel;
3021 : LogicalRepTupleData oldtup;
3022 : LogicalRepRelId relid;
3023 : UserContext ucxt;
3024 : ApplyExecutionData *edata;
3025 : EState *estate;
3026 : TupleTableSlot *remoteslot;
3027 : MemoryContext oldctx;
3028 : bool run_as_owner;
3029 :
3030 : /*
3031 : * Quick return if we are skipping data modification changes or handling
3032 : * streamed transactions.
3033 : */
3034 163872 : if (is_skipping_changes() ||
3035 81936 : handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
3036 41615 : return;
3037 :
3038 40321 : begin_replication_step();
3039 :
3040 40321 : relid = logicalrep_read_delete(s, &oldtup);
3041 40321 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
3042 40321 : if (!should_apply_changes_for_rel(rel))
3043 : {
3044 : /*
3045 : * The relation can't become interesting in the middle of the
3046 : * transaction so it's safe to unlock it.
3047 : */
3048 0 : logicalrep_rel_close(rel, RowExclusiveLock);
3049 0 : end_replication_step();
3050 0 : return;
3051 : }
3052 :
3053 : /* Set relation for error callback */
3054 40321 : apply_error_callback_arg.rel = rel;
3055 :
3056 : /* Check if we can do the delete. */
3057 40321 : check_relation_updatable(rel);
3058 :
3059 : /*
3060 : * Make sure that any user-supplied code runs as the table owner, unless
3061 : * the user has opted out of that behavior.
3062 : */
3063 40321 : run_as_owner = MySubscription->runasowner;
3064 40321 : if (!run_as_owner)
3065 40319 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
3066 :
3067 : /* Initialize the executor state. */
3068 40321 : edata = create_edata_for_relation(rel);
3069 40321 : estate = edata->estate;
3070 40321 : remoteslot = ExecInitExtraTupleSlot(estate,
3071 40321 : RelationGetDescr(rel->localrel),
3072 : &TTSOpsVirtual);
3073 :
3074 : /* Build the search tuple. */
3075 40321 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3076 40321 : slot_store_data(remoteslot, rel, &oldtup);
3077 40321 : MemoryContextSwitchTo(oldctx);
3078 :
3079 : /* For a partitioned table, apply delete to correct partition. */
3080 40321 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3081 17 : apply_handle_tuple_routing(edata,
3082 : remoteslot, NULL, CMD_DELETE);
3083 : else
3084 : {
3085 40304 : ResultRelInfo *relinfo = edata->targetRelInfo;
3086 :
3087 40304 : ExecOpenIndices(relinfo, false);
3088 40304 : apply_handle_delete_internal(edata, relinfo,
3089 : remoteslot, rel->localindexoid);
3090 40304 : ExecCloseIndices(relinfo);
3091 : }
3092 :
3093 40321 : finish_edata(edata);
3094 :
3095 : /* Reset relation for error callback */
3096 40321 : apply_error_callback_arg.rel = NULL;
3097 :
3098 40321 : if (!run_as_owner)
3099 40319 : RestoreUserContext(&ucxt);
3100 :
3101 40321 : logicalrep_rel_close(rel, NoLock);
3102 :
3103 40321 : end_replication_step();
3104 : }
3105 :
3106 : /*
3107 : * Workhorse for apply_handle_delete()
3108 : * relinfo is for the relation we're actually deleting from
3109 : * (could be a child partition of edata->targetRelInfo)
3110 : */
3111 : static void
3112 40321 : apply_handle_delete_internal(ApplyExecutionData *edata,
3113 : ResultRelInfo *relinfo,
3114 : TupleTableSlot *remoteslot,
3115 : Oid localindexoid)
3116 : {
3117 40321 : EState *estate = edata->estate;
3118 40321 : Relation localrel = relinfo->ri_RelationDesc;
3119 40321 : LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
3120 : EPQState epqstate;
3121 : TupleTableSlot *localslot;
3122 40321 : ConflictTupleInfo conflicttuple = {0};
3123 : bool found;
3124 :
3125 40321 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3126 :
3127 : /* Caller should have opened indexes already. */
3128 : Assert(relinfo->ri_IndexRelationDescs != NULL ||
3129 : !localrel->rd_rel->relhasindex ||
3130 : RelationGetIndexList(localrel) == NIL);
3131 :
3132 40321 : found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
3133 : remoteslot, &localslot);
3134 :
3135 : /* If found delete it. */
3136 40321 : if (found)
3137 : {
3138 : /*
3139 : * Report the conflict if the tuple was modified by a different
3140 : * origin.
3141 : */
3142 40312 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
3143 6 : &conflicttuple.origin, &conflicttuple.ts) &&
3144 6 : conflicttuple.origin != replorigin_xact_state.origin)
3145 : {
3146 5 : conflicttuple.slot = localslot;
3147 5 : ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
3148 : remoteslot, NULL,
3149 5 : list_make1(&conflicttuple));
3150 : }
3151 :
3152 40312 : EvalPlanQualSetSlot(&epqstate, localslot);
3153 :
3154 : /* Do the actual delete. */
3155 40312 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_DELETE);
3156 40312 : ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
3157 : }
3158 : else
3159 : {
3160 : /*
3161 : * The tuple to be deleted could not be found. Do nothing except for
3162 : * emitting a log message.
3163 : */
3164 9 : ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
3165 9 : remoteslot, NULL, list_make1(&conflicttuple));
3166 : }
3167 :
3168 : /* Cleanup. */
3169 40321 : EvalPlanQualEnd(&epqstate);
3170 40321 : }
3171 :
3172 : /*
3173 : * Try to find a tuple received from the publication side (in 'remoteslot') in
3174 : * the corresponding local relation using either replica identity index,
3175 : * primary key, index or if needed, sequential scan.
3176 : *
3177 : * Local tuple, if found, is returned in '*localslot'.
3178 : */
3179 : static bool
3180 72269 : FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
3181 : LogicalRepRelation *remoterel,
3182 : Oid localidxoid,
3183 : TupleTableSlot *remoteslot,
3184 : TupleTableSlot **localslot)
3185 : {
3186 72269 : EState *estate = edata->estate;
3187 : bool found;
3188 :
3189 : /*
3190 : * Regardless of the top-level operation, we're performing a read here, so
3191 : * check for SELECT privileges.
3192 : */
3193 72269 : TargetPrivilegesCheck(localrel, ACL_SELECT);
3194 :
3195 72263 : *localslot = table_slot_create(localrel, &estate->es_tupleTable);
3196 :
3197 : Assert(OidIsValid(localidxoid) ||
3198 : (remoterel->replident == REPLICA_IDENTITY_FULL));
3199 :
3200 72263 : if (OidIsValid(localidxoid))
3201 : {
3202 : #ifdef USE_ASSERT_CHECKING
3203 : Relation idxrel = index_open(localidxoid, AccessShareLock);
3204 :
3205 : /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
3206 : Assert(GetRelationIdentityOrPK(localrel) == localidxoid ||
3207 : (remoterel->replident == REPLICA_IDENTITY_FULL &&
3208 : IsIndexUsableForReplicaIdentityFull(idxrel,
3209 : edata->targetRel->attrmap)));
3210 : index_close(idxrel, AccessShareLock);
3211 : #endif
3212 :
3213 72112 : found = RelationFindReplTupleByIndex(localrel, localidxoid,
3214 : LockTupleExclusive,
3215 : remoteslot, *localslot);
3216 : }
3217 : else
3218 151 : found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
3219 : remoteslot, *localslot);
3220 :
3221 72263 : return found;
3222 : }
3223 :
3224 : /*
3225 : * Determine whether the index can reliably locate the deleted tuple in the
3226 : * local relation.
3227 : *
3228 : * An index may exclude deleted tuples if it was re-indexed or re-created during
3229 : * change application. Therefore, an index is considered usable only if the
3230 : * conflict detection slot.xmin (conflict_detection_xmin) is greater than the
3231 : * index tuple's xmin. This ensures that any tuples deleted prior to the index
3232 : * creation or re-indexing are not relevant for conflict detection in the
3233 : * current apply worker.
3234 : *
3235 : * Note that indexes may also be excluded if they were modified by other DDL
3236 : * operations, such as ALTER INDEX. However, this is acceptable, as the
3237 : * likelihood of such DDL changes coinciding with the need to scan dead
3238 : * tuples for the update_deleted is low.
3239 : */
3240 : static bool
3241 1 : IsIndexUsableForFindingDeletedTuple(Oid localindexoid,
3242 : TransactionId conflict_detection_xmin)
3243 : {
3244 : HeapTuple index_tuple;
3245 : TransactionId index_xmin;
3246 :
3247 1 : index_tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(localindexoid));
3248 :
3249 1 : if (!HeapTupleIsValid(index_tuple)) /* should not happen */
3250 0 : elog(ERROR, "cache lookup failed for index %u", localindexoid);
3251 :
3252 : /*
3253 : * No need to check for a frozen transaction ID, as
3254 : * TransactionIdPrecedes() manages it internally, treating it as falling
3255 : * behind the conflict_detection_xmin.
3256 : */
3257 1 : index_xmin = HeapTupleHeaderGetXmin(index_tuple->t_data);
3258 :
3259 1 : ReleaseSysCache(index_tuple);
3260 :
3261 1 : return TransactionIdPrecedes(index_xmin, conflict_detection_xmin);
3262 : }
3263 :
3264 : /*
3265 : * Attempts to locate a deleted tuple in the local relation that matches the
3266 : * values of the tuple received from the publication side (in 'remoteslot').
3267 : * The search is performed using either the replica identity index, primary
3268 : * key, other available index, or a sequential scan if necessary.
3269 : *
3270 : * Returns true if the deleted tuple is found. If found, the transaction ID,
3271 : * origin, and commit timestamp of the deletion are stored in '*delete_xid',
3272 : * '*delete_origin', and '*delete_time' respectively.
3273 : */
3274 : static bool
3275 18 : FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid,
3276 : TupleTableSlot *remoteslot,
3277 : TransactionId *delete_xid, ReplOriginId *delete_origin,
3278 : TimestampTz *delete_time)
3279 : {
3280 : TransactionId oldestxmin;
3281 :
3282 : /*
3283 : * Return false if either dead tuples are not retained or commit timestamp
3284 : * data is not available.
3285 : */
3286 18 : if (!MySubscription->retaindeadtuples || !track_commit_timestamp)
3287 15 : return false;
3288 :
3289 : /*
3290 : * For conflict detection, we use the leader worker's
3291 : * oldest_nonremovable_xid value instead of invoking
3292 : * GetOldestNonRemovableTransactionId() or using the conflict detection
3293 : * slot's xmin. The oldest_nonremovable_xid acts as a threshold to
3294 : * identify tuples that were recently deleted. These deleted tuples are no
3295 : * longer visible to concurrent transactions. However, if a remote update
3296 : * matches such a tuple, we log an update_deleted conflict.
3297 : *
3298 : * While GetOldestNonRemovableTransactionId() and slot.xmin may return
3299 : * transaction IDs older than oldest_nonremovable_xid, for our current
3300 : * purpose, it is acceptable to treat tuples deleted by transactions prior
3301 : * to oldest_nonremovable_xid as update_missing conflicts.
3302 : */
3303 3 : if (am_leader_apply_worker())
3304 : {
3305 3 : oldestxmin = MyLogicalRepWorker->oldest_nonremovable_xid;
3306 : }
3307 : else
3308 : {
3309 : LogicalRepWorker *leader;
3310 :
3311 : /*
3312 : * Obtain the information from the leader apply worker as only the
3313 : * leader manages oldest_nonremovable_xid (see
3314 : * maybe_advance_nonremovable_xid() for details).
3315 : */
3316 0 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
3317 0 : leader = logicalrep_worker_find(WORKERTYPE_APPLY,
3318 0 : MyLogicalRepWorker->subid, InvalidOid,
3319 : false);
3320 0 : if (!leader)
3321 : {
3322 0 : ereport(ERROR,
3323 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3324 : errmsg("could not detect conflict as the leader apply worker has exited")));
3325 : }
3326 :
3327 0 : SpinLockAcquire(&leader->relmutex);
3328 0 : oldestxmin = leader->oldest_nonremovable_xid;
3329 0 : SpinLockRelease(&leader->relmutex);
3330 0 : LWLockRelease(LogicalRepWorkerLock);
3331 : }
3332 :
3333 : /*
3334 : * Return false if the leader apply worker has stopped retaining
3335 : * information for detecting conflicts. This implies that update_deleted
3336 : * can no longer be reliably detected.
3337 : */
3338 3 : if (!TransactionIdIsValid(oldestxmin))
3339 0 : return false;
3340 :
3341 4 : if (OidIsValid(localidxoid) &&
3342 1 : IsIndexUsableForFindingDeletedTuple(localidxoid, oldestxmin))
3343 1 : return RelationFindDeletedTupleInfoByIndex(localrel, localidxoid,
3344 : remoteslot, oldestxmin,
3345 : delete_xid, delete_origin,
3346 : delete_time);
3347 : else
3348 2 : return RelationFindDeletedTupleInfoSeq(localrel, remoteslot,
3349 : oldestxmin, delete_xid,
3350 : delete_origin, delete_time);
3351 : }
3352 :
3353 : /*
3354 : * This handles insert, update, delete on a partitioned table.
3355 : */
3356 : static void
3357 90 : apply_handle_tuple_routing(ApplyExecutionData *edata,
3358 : TupleTableSlot *remoteslot,
3359 : LogicalRepTupleData *newtup,
3360 : CmdType operation)
3361 : {
3362 90 : EState *estate = edata->estate;
3363 90 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
3364 90 : ResultRelInfo *relinfo = edata->targetRelInfo;
3365 90 : Relation parentrel = relinfo->ri_RelationDesc;
3366 : ModifyTableState *mtstate;
3367 : PartitionTupleRouting *proute;
3368 : ResultRelInfo *partrelinfo;
3369 : Relation partrel;
3370 : TupleTableSlot *remoteslot_part;
3371 : TupleConversionMap *map;
3372 : MemoryContext oldctx;
3373 90 : LogicalRepRelMapEntry *part_entry = NULL;
3374 90 : AttrMap *attrmap = NULL;
3375 :
3376 : /* ModifyTableState is needed for ExecFindPartition(). */
3377 90 : edata->mtstate = mtstate = makeNode(ModifyTableState);
3378 90 : mtstate->ps.plan = NULL;
3379 90 : mtstate->ps.state = estate;
3380 90 : mtstate->operation = operation;
3381 90 : mtstate->resultRelInfo = relinfo;
3382 :
3383 : /* ... as is PartitionTupleRouting. */
3384 90 : edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
3385 :
3386 : /*
3387 : * Find the partition to which the "search tuple" belongs.
3388 : */
3389 : Assert(remoteslot != NULL);
3390 90 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3391 90 : partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
3392 : remoteslot, estate);
3393 : Assert(partrelinfo != NULL);
3394 90 : partrel = partrelinfo->ri_RelationDesc;
3395 :
3396 : /*
3397 : * Check for supported relkind. We need this since partitions might be of
3398 : * unsupported relkinds; and the set of partitions can change, so checking
3399 : * at CREATE/ALTER SUBSCRIPTION would be insufficient.
3400 : */
3401 90 : CheckSubscriptionRelkind(partrel->rd_rel->relkind,
3402 90 : relmapentry->remoterel.relkind,
3403 90 : get_namespace_name(RelationGetNamespace(partrel)),
3404 90 : RelationGetRelationName(partrel));
3405 :
3406 : /*
3407 : * To perform any of the operations below, the tuple must match the
3408 : * partition's rowtype. Convert if needed or just copy, using a dedicated
3409 : * slot to store the tuple in any case.
3410 : */
3411 90 : remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
3412 90 : if (remoteslot_part == NULL)
3413 57 : remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
3414 90 : map = ExecGetRootToChildMap(partrelinfo, estate);
3415 90 : if (map != NULL)
3416 : {
3417 33 : attrmap = map->attrMap;
3418 33 : remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
3419 : remoteslot_part);
3420 : }
3421 : else
3422 : {
3423 57 : remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
3424 57 : slot_getallattrs(remoteslot_part);
3425 : }
3426 90 : MemoryContextSwitchTo(oldctx);
3427 :
3428 : /* Check if we can do the update or delete on the leaf partition. */
3429 90 : if (operation == CMD_UPDATE || operation == CMD_DELETE)
3430 : {
3431 30 : part_entry = logicalrep_partition_open(relmapentry, partrel,
3432 : attrmap);
3433 30 : check_relation_updatable(part_entry);
3434 : }
3435 :
3436 90 : switch (operation)
3437 : {
3438 60 : case CMD_INSERT:
3439 60 : apply_handle_insert_internal(edata, partrelinfo,
3440 : remoteslot_part);
3441 44 : break;
3442 :
3443 17 : case CMD_DELETE:
3444 17 : apply_handle_delete_internal(edata, partrelinfo,
3445 : remoteslot_part,
3446 : part_entry->localindexoid);
3447 17 : break;
3448 :
3449 13 : case CMD_UPDATE:
3450 :
3451 : /*
3452 : * For UPDATE, depending on whether or not the updated tuple
3453 : * satisfies the partition's constraint, perform a simple UPDATE
3454 : * of the partition or move the updated tuple into a different
3455 : * suitable partition.
3456 : */
3457 : {
3458 : TupleTableSlot *localslot;
3459 : ResultRelInfo *partrelinfo_new;
3460 : Relation partrel_new;
3461 : bool found;
3462 : EPQState epqstate;
3463 13 : ConflictTupleInfo conflicttuple = {0};
3464 :
3465 : /* Get the matching local tuple from the partition. */
3466 13 : found = FindReplTupleInLocalRel(edata, partrel,
3467 : &part_entry->remoterel,
3468 : part_entry->localindexoid,
3469 : remoteslot_part, &localslot);
3470 13 : if (!found)
3471 : {
3472 : ConflictType type;
3473 2 : TupleTableSlot *newslot = localslot;
3474 :
3475 : /*
3476 : * Detecting whether the tuple was recently deleted or
3477 : * never existed is crucial to avoid misleading the user
3478 : * during conflict handling.
3479 : */
3480 2 : if (FindDeletedTupleInLocalRel(partrel,
3481 : part_entry->localindexoid,
3482 : remoteslot_part,
3483 : &conflicttuple.xmin,
3484 : &conflicttuple.origin,
3485 0 : &conflicttuple.ts) &&
3486 0 : conflicttuple.origin != replorigin_xact_state.origin)
3487 0 : type = CT_UPDATE_DELETED;
3488 : else
3489 2 : type = CT_UPDATE_MISSING;
3490 :
3491 : /* Store the new tuple for conflict reporting */
3492 2 : slot_store_data(newslot, part_entry, newtup);
3493 :
3494 : /*
3495 : * The tuple to be updated could not be found or was
3496 : * deleted. Do nothing except for emitting a log message.
3497 : */
3498 2 : ReportApplyConflict(estate, partrelinfo, LOG,
3499 : type, remoteslot_part, newslot,
3500 2 : list_make1(&conflicttuple));
3501 :
3502 2 : return;
3503 : }
3504 :
3505 : /*
3506 : * Report the conflict if the tuple was modified by a
3507 : * different origin.
3508 : */
3509 11 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
3510 : &conflicttuple.origin,
3511 1 : &conflicttuple.ts) &&
3512 1 : conflicttuple.origin != replorigin_xact_state.origin)
3513 : {
3514 : TupleTableSlot *newslot;
3515 :
3516 : /* Store the new tuple for conflict reporting */
3517 1 : newslot = table_slot_create(partrel, &estate->es_tupleTable);
3518 1 : slot_store_data(newslot, part_entry, newtup);
3519 :
3520 1 : conflicttuple.slot = localslot;
3521 :
3522 1 : ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
3523 : remoteslot_part, newslot,
3524 1 : list_make1(&conflicttuple));
3525 : }
3526 :
3527 : /*
3528 : * Apply the update to the local tuple, putting the result in
3529 : * remoteslot_part.
3530 : */
3531 11 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3532 11 : slot_modify_data(remoteslot_part, localslot, part_entry,
3533 : newtup);
3534 11 : MemoryContextSwitchTo(oldctx);
3535 :
3536 11 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3537 :
3538 : /*
3539 : * Does the updated tuple still satisfy the current
3540 : * partition's constraint?
3541 : */
3542 22 : if (!partrel->rd_rel->relispartition ||
3543 11 : ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
3544 : false))
3545 : {
3546 : /*
3547 : * Yes, so simply UPDATE the partition. We don't call
3548 : * apply_handle_update_internal() here, which would
3549 : * normally do the following work, to avoid repeating some
3550 : * work already done above to find the local tuple in the
3551 : * partition.
3552 : */
3553 10 : InitConflictIndexes(partrelinfo);
3554 :
3555 10 : EvalPlanQualSetSlot(&epqstate, remoteslot_part);
3556 10 : TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
3557 : ACL_UPDATE);
3558 10 : ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3559 : localslot, remoteslot_part);
3560 : }
3561 : else
3562 : {
3563 : /* Move the tuple into the new partition. */
3564 :
3565 : /*
3566 : * New partition will be found using tuple routing, which
3567 : * can only occur via the parent table. We might need to
3568 : * convert the tuple to the parent's rowtype. Note that
3569 : * this is the tuple found in the partition, not the
3570 : * original search tuple received by this function.
3571 : */
3572 1 : if (map)
3573 : {
3574 : TupleConversionMap *PartitionToRootMap =
3575 1 : convert_tuples_by_name(RelationGetDescr(partrel),
3576 : RelationGetDescr(parentrel));
3577 :
3578 : remoteslot =
3579 1 : execute_attr_map_slot(PartitionToRootMap->attrMap,
3580 : remoteslot_part, remoteslot);
3581 : }
3582 : else
3583 : {
3584 0 : remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
3585 0 : slot_getallattrs(remoteslot);
3586 : }
3587 :
3588 : /* Find the new partition. */
3589 1 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3590 1 : partrelinfo_new = ExecFindPartition(mtstate, relinfo,
3591 : proute, remoteslot,
3592 : estate);
3593 1 : MemoryContextSwitchTo(oldctx);
3594 : Assert(partrelinfo_new != partrelinfo);
3595 1 : partrel_new = partrelinfo_new->ri_RelationDesc;
3596 :
3597 : /* Check that new partition also has supported relkind. */
3598 1 : CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3599 1 : relmapentry->remoterel.relkind,
3600 1 : get_namespace_name(RelationGetNamespace(partrel_new)),
3601 1 : RelationGetRelationName(partrel_new));
3602 :
3603 : /* DELETE old tuple found in the old partition. */
3604 1 : EvalPlanQualSetSlot(&epqstate, localslot);
3605 1 : TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, ACL_DELETE);
3606 1 : ExecSimpleRelationDelete(partrelinfo, estate, &epqstate, localslot);
3607 :
3608 : /* INSERT new tuple into the new partition. */
3609 :
3610 : /*
3611 : * Convert the replacement tuple to match the destination
3612 : * partition rowtype.
3613 : */
3614 1 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3615 1 : remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3616 1 : if (remoteslot_part == NULL)
3617 1 : remoteslot_part = table_slot_create(partrel_new,
3618 : &estate->es_tupleTable);
3619 1 : map = ExecGetRootToChildMap(partrelinfo_new, estate);
3620 1 : if (map != NULL)
3621 : {
3622 0 : remoteslot_part = execute_attr_map_slot(map->attrMap,
3623 : remoteslot,
3624 : remoteslot_part);
3625 : }
3626 : else
3627 : {
3628 1 : remoteslot_part = ExecCopySlot(remoteslot_part,
3629 : remoteslot);
3630 1 : slot_getallattrs(remoteslot);
3631 : }
3632 1 : MemoryContextSwitchTo(oldctx);
3633 1 : apply_handle_insert_internal(edata, partrelinfo_new,
3634 : remoteslot_part);
3635 : }
3636 :
3637 11 : EvalPlanQualEnd(&epqstate);
3638 : }
3639 11 : break;
3640 :
3641 0 : default:
3642 0 : elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3643 : break;
3644 : }
3645 : }
3646 :
3647 : /*
3648 : * Handle TRUNCATE message.
3649 : *
3650 : * TODO: FDW support
3651 : */
3652 : static void
3653 20 : apply_handle_truncate(StringInfo s)
3654 : {
3655 20 : bool cascade = false;
3656 20 : bool restart_seqs = false;
3657 20 : List *remote_relids = NIL;
3658 20 : List *remote_rels = NIL;
3659 20 : List *rels = NIL;
3660 20 : List *part_rels = NIL;
3661 20 : List *relids = NIL;
3662 20 : List *relids_logged = NIL;
3663 : ListCell *lc;
3664 20 : LOCKMODE lockmode = AccessExclusiveLock;
3665 :
3666 : /*
3667 : * Quick return if we are skipping data modification changes or handling
3668 : * streamed transactions.
3669 : */
3670 40 : if (is_skipping_changes() ||
3671 20 : handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
3672 0 : return;
3673 :
3674 20 : begin_replication_step();
3675 :
3676 20 : remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3677 :
3678 49 : foreach(lc, remote_relids)
3679 : {
3680 29 : LogicalRepRelId relid = lfirst_oid(lc);
3681 : LogicalRepRelMapEntry *rel;
3682 :
3683 29 : rel = logicalrep_rel_open(relid, lockmode);
3684 29 : if (!should_apply_changes_for_rel(rel))
3685 : {
3686 : /*
3687 : * The relation can't become interesting in the middle of the
3688 : * transaction so it's safe to unlock it.
3689 : */
3690 0 : logicalrep_rel_close(rel, lockmode);
3691 0 : continue;
3692 : }
3693 :
3694 29 : remote_rels = lappend(remote_rels, rel);
3695 29 : TargetPrivilegesCheck(rel->localrel, ACL_TRUNCATE);
3696 29 : rels = lappend(rels, rel->localrel);
3697 29 : relids = lappend_oid(relids, rel->localreloid);
3698 29 : if (RelationIsLogicallyLogged(rel->localrel))
3699 1 : relids_logged = lappend_oid(relids_logged, rel->localreloid);
3700 :
3701 : /*
3702 : * Truncate partitions if we got a message to truncate a partitioned
3703 : * table.
3704 : */
3705 29 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3706 : {
3707 : ListCell *child;
3708 4 : List *children = find_all_inheritors(rel->localreloid,
3709 : lockmode,
3710 : NULL);
3711 :
3712 15 : foreach(child, children)
3713 : {
3714 11 : Oid childrelid = lfirst_oid(child);
3715 : Relation childrel;
3716 :
3717 11 : if (list_member_oid(relids, childrelid))
3718 4 : continue;
3719 :
3720 : /* find_all_inheritors already got lock */
3721 7 : childrel = table_open(childrelid, NoLock);
3722 :
3723 : /*
3724 : * Ignore temp tables of other backends. See similar code in
3725 : * ExecuteTruncate().
3726 : */
3727 7 : if (RELATION_IS_OTHER_TEMP(childrel))
3728 : {
3729 0 : table_close(childrel, lockmode);
3730 0 : continue;
3731 : }
3732 :
3733 7 : TargetPrivilegesCheck(childrel, ACL_TRUNCATE);
3734 7 : rels = lappend(rels, childrel);
3735 7 : part_rels = lappend(part_rels, childrel);
3736 7 : relids = lappend_oid(relids, childrelid);
3737 : /* Log this relation only if needed for logical decoding */
3738 7 : if (RelationIsLogicallyLogged(childrel))
3739 0 : relids_logged = lappend_oid(relids_logged, childrelid);
3740 : }
3741 : }
3742 : }
3743 :
3744 : /*
3745 : * Even if we used CASCADE on the upstream primary we explicitly default
3746 : * to replaying changes without further cascading. This might be later
3747 : * changeable with a user specified option.
3748 : *
3749 : * MySubscription->runasowner tells us whether we want to execute
3750 : * replication actions as the subscription owner; the last argument to
3751 : * TruncateGuts tells it whether we want to switch to the table owner.
3752 : * Those are exactly opposite conditions.
3753 : */
3754 20 : ExecuteTruncateGuts(rels,
3755 : relids,
3756 : relids_logged,
3757 : DROP_RESTRICT,
3758 : restart_seqs,
3759 20 : !MySubscription->runasowner);
3760 49 : foreach(lc, remote_rels)
3761 : {
3762 29 : LogicalRepRelMapEntry *rel = lfirst(lc);
3763 :
3764 29 : logicalrep_rel_close(rel, NoLock);
3765 : }
3766 27 : foreach(lc, part_rels)
3767 : {
3768 7 : Relation rel = lfirst(lc);
3769 :
3770 7 : table_close(rel, NoLock);
3771 : }
3772 :
3773 20 : end_replication_step();
3774 : }
3775 :
3776 :
3777 : /*
3778 : * Logical replication protocol message dispatcher.
3779 : */
3780 : void
3781 337577 : apply_dispatch(StringInfo s)
3782 : {
3783 337577 : LogicalRepMsgType action = pq_getmsgbyte(s);
3784 : LogicalRepMsgType saved_command;
3785 :
3786 : /*
3787 : * Set the current command being applied. Since this function can be
3788 : * called recursively when applying spooled changes, save the current
3789 : * command.
3790 : */
3791 337577 : saved_command = apply_error_callback_arg.command;
3792 337577 : apply_error_callback_arg.command = action;
3793 :
3794 337577 : switch (action)
3795 : {
3796 502 : case LOGICAL_REP_MSG_BEGIN:
3797 502 : apply_handle_begin(s);
3798 502 : break;
3799 :
3800 450 : case LOGICAL_REP_MSG_COMMIT:
3801 450 : apply_handle_commit(s);
3802 450 : break;
3803 :
3804 186119 : case LOGICAL_REP_MSG_INSERT:
3805 186119 : apply_handle_insert(s);
3806 186075 : break;
3807 :
3808 66173 : case LOGICAL_REP_MSG_UPDATE:
3809 66173 : apply_handle_update(s);
3810 66163 : break;
3811 :
3812 81936 : case LOGICAL_REP_MSG_DELETE:
3813 81936 : apply_handle_delete(s);
3814 81936 : break;
3815 :
3816 20 : case LOGICAL_REP_MSG_TRUNCATE:
3817 20 : apply_handle_truncate(s);
3818 20 : break;
3819 :
3820 491 : case LOGICAL_REP_MSG_RELATION:
3821 491 : apply_handle_relation(s);
3822 491 : break;
3823 :
3824 18 : case LOGICAL_REP_MSG_TYPE:
3825 18 : apply_handle_type(s);
3826 18 : break;
3827 :
3828 7 : case LOGICAL_REP_MSG_ORIGIN:
3829 7 : apply_handle_origin(s);
3830 7 : break;
3831 :
3832 0 : case LOGICAL_REP_MSG_MESSAGE:
3833 :
3834 : /*
3835 : * Logical replication does not use generic logical messages yet.
3836 : * Although, it could be used by other applications that use this
3837 : * output plugin.
3838 : */
3839 0 : break;
3840 :
3841 843 : case LOGICAL_REP_MSG_STREAM_START:
3842 843 : apply_handle_stream_start(s);
3843 843 : break;
3844 :
3845 842 : case LOGICAL_REP_MSG_STREAM_STOP:
3846 842 : apply_handle_stream_stop(s);
3847 840 : break;
3848 :
3849 38 : case LOGICAL_REP_MSG_STREAM_ABORT:
3850 38 : apply_handle_stream_abort(s);
3851 38 : break;
3852 :
3853 61 : case LOGICAL_REP_MSG_STREAM_COMMIT:
3854 61 : apply_handle_stream_commit(s);
3855 59 : break;
3856 :
3857 18 : case LOGICAL_REP_MSG_BEGIN_PREPARE:
3858 18 : apply_handle_begin_prepare(s);
3859 18 : break;
3860 :
3861 17 : case LOGICAL_REP_MSG_PREPARE:
3862 17 : apply_handle_prepare(s);
3863 15 : break;
3864 :
3865 22 : case LOGICAL_REP_MSG_COMMIT_PREPARED:
3866 22 : apply_handle_commit_prepared(s);
3867 22 : break;
3868 :
3869 5 : case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
3870 5 : apply_handle_rollback_prepared(s);
3871 5 : break;
3872 :
3873 15 : case LOGICAL_REP_MSG_STREAM_PREPARE:
3874 15 : apply_handle_stream_prepare(s);
3875 13 : break;
3876 :
3877 0 : default:
3878 0 : ereport(ERROR,
3879 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
3880 : errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3881 : }
3882 :
3883 : /* Reset the current command */
3884 337515 : apply_error_callback_arg.command = saved_command;
3885 337515 : }
3886 :
3887 : /*
3888 : * Figure out which write/flush positions to report to the walsender process.
3889 : *
3890 : * We can't simply report back the last LSN the walsender sent us because the
3891 : * local transaction might not yet be flushed to disk locally. Instead we
3892 : * build a list that associates local with remote LSNs for every commit. When
3893 : * reporting back the flush position to the sender we iterate that list and
3894 : * check which entries on it are already locally flushed. Those we can report
3895 : * as having been flushed.
3896 : *
3897 : * The have_pending_txes is true if there are outstanding transactions that
3898 : * need to be flushed.
3899 : */
3900 : static void
3901 29051 : get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
3902 : bool *have_pending_txes)
3903 : {
3904 : dlist_mutable_iter iter;
3905 29051 : XLogRecPtr local_flush = GetFlushRecPtr(NULL);
3906 :
3907 29051 : *write = InvalidXLogRecPtr;
3908 29051 : *flush = InvalidXLogRecPtr;
3909 :
3910 29554 : dlist_foreach_modify(iter, &lsn_mapping)
3911 : {
3912 3516 : FlushPosition *pos =
3913 : dlist_container(FlushPosition, node, iter.cur);
3914 :
3915 3516 : *write = pos->remote_end;
3916 :
3917 3516 : if (pos->local_end <= local_flush)
3918 : {
3919 503 : *flush = pos->remote_end;
3920 503 : dlist_delete(iter.cur);
3921 503 : pfree(pos);
3922 : }
3923 : else
3924 : {
3925 : /*
3926 : * Don't want to uselessly iterate over the rest of the list which
3927 : * could potentially be long. Instead get the last element and
3928 : * grab the write position from there.
3929 : */
3930 3013 : pos = dlist_tail_element(FlushPosition, node,
3931 : &lsn_mapping);
3932 3013 : *write = pos->remote_end;
3933 3013 : *have_pending_txes = true;
3934 3013 : return;
3935 : }
3936 : }
3937 :
3938 26038 : *have_pending_txes = !dlist_is_empty(&lsn_mapping);
3939 : }
3940 :
3941 : /*
3942 : * Store current remote/local lsn pair in the tracking list.
3943 : */
3944 : void
3945 560 : store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
3946 : {
3947 : FlushPosition *flushpos;
3948 :
3949 : /*
3950 : * Skip for parallel apply workers, because the lsn_mapping is maintained
3951 : * by the leader apply worker.
3952 : */
3953 560 : if (am_parallel_apply_worker())
3954 19 : return;
3955 :
3956 : /* Need to do this in permanent context */
3957 541 : MemoryContextSwitchTo(ApplyContext);
3958 :
3959 : /* Track commit lsn */
3960 541 : flushpos = palloc_object(FlushPosition);
3961 541 : flushpos->local_end = local_lsn;
3962 541 : flushpos->remote_end = remote_lsn;
3963 :
3964 541 : dlist_push_tail(&lsn_mapping, &flushpos->node);
3965 541 : MemoryContextSwitchTo(ApplyMessageContext);
3966 : }
3967 :
3968 :
3969 : /* Update statistics of the worker. */
3970 : static void
3971 190917 : UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
3972 : {
3973 190917 : MyLogicalRepWorker->last_lsn = last_lsn;
3974 190917 : MyLogicalRepWorker->last_send_time = send_time;
3975 190917 : MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
3976 190917 : if (reply)
3977 : {
3978 1804 : MyLogicalRepWorker->reply_lsn = last_lsn;
3979 1804 : MyLogicalRepWorker->reply_time = send_time;
3980 : }
3981 190917 : }
3982 :
3983 : /*
3984 : * Apply main loop.
3985 : */
3986 : static void
3987 430 : LogicalRepApplyLoop(XLogRecPtr last_received)
3988 : {
3989 430 : TimestampTz last_recv_timestamp = GetCurrentTimestamp();
3990 430 : bool ping_sent = false;
3991 : TimeLineID tli;
3992 : ErrorContextCallback errcallback;
3993 430 : RetainDeadTuplesData rdt_data = {0};
3994 :
3995 : /*
3996 : * Init the ApplyMessageContext which we clean up after each replication
3997 : * protocol message.
3998 : */
3999 430 : ApplyMessageContext = AllocSetContextCreate(ApplyContext,
4000 : "ApplyMessageContext",
4001 : ALLOCSET_DEFAULT_SIZES);
4002 :
4003 : /*
4004 : * This memory context is used for per-stream data when the streaming mode
4005 : * is enabled. This context is reset on each stream stop.
4006 : */
4007 430 : LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
4008 : "LogicalStreamingContext",
4009 : ALLOCSET_DEFAULT_SIZES);
4010 :
4011 : /* mark as idle, before starting to loop */
4012 430 : pgstat_report_activity(STATE_IDLE, NULL);
4013 :
4014 : /*
4015 : * Push apply error context callback. Fields will be filled while applying
4016 : * a change.
4017 : */
4018 430 : errcallback.callback = apply_error_callback;
4019 430 : errcallback.previous = error_context_stack;
4020 430 : error_context_stack = &errcallback;
4021 430 : apply_error_context_stack = error_context_stack;
4022 :
4023 : /* This outer loop iterates once per wait. */
4024 : for (;;)
4025 26544 : {
4026 26974 : pgsocket fd = PGINVALID_SOCKET;
4027 : int rc;
4028 : int len;
4029 26974 : char *buf = NULL;
4030 26974 : bool endofstream = false;
4031 : long wait_time;
4032 :
4033 26974 : CHECK_FOR_INTERRUPTS();
4034 :
4035 26974 : MemoryContextSwitchTo(ApplyMessageContext);
4036 :
4037 26974 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
4038 :
4039 26956 : if (len != 0)
4040 : {
4041 : /* Loop to process all available data (without blocking). */
4042 : for (;;)
4043 : {
4044 216648 : CHECK_FOR_INTERRUPTS();
4045 :
4046 216648 : if (len == 0)
4047 : {
4048 25721 : break;
4049 : }
4050 190927 : else if (len < 0)
4051 : {
4052 10 : ereport(LOG,
4053 : (errmsg("data stream from publisher has ended")));
4054 10 : endofstream = true;
4055 10 : break;
4056 : }
4057 : else
4058 : {
4059 : int c;
4060 : StringInfoData s;
4061 :
4062 190917 : if (ConfigReloadPending)
4063 : {
4064 0 : ConfigReloadPending = false;
4065 0 : ProcessConfigFile(PGC_SIGHUP);
4066 : }
4067 :
4068 : /* Reset timeout. */
4069 190917 : last_recv_timestamp = GetCurrentTimestamp();
4070 190917 : ping_sent = false;
4071 :
4072 190917 : rdt_data.last_recv_time = last_recv_timestamp;
4073 :
4074 : /* Ensure we are reading the data into our memory context. */
4075 190917 : MemoryContextSwitchTo(ApplyMessageContext);
4076 :
4077 190917 : initReadOnlyStringInfo(&s, buf, len);
4078 :
4079 190917 : c = pq_getmsgbyte(&s);
4080 :
4081 190917 : if (c == PqReplMsg_WALData)
4082 : {
4083 : XLogRecPtr start_lsn;
4084 : XLogRecPtr end_lsn;
4085 : TimestampTz send_time;
4086 :
4087 185036 : start_lsn = pq_getmsgint64(&s);
4088 185036 : end_lsn = pq_getmsgint64(&s);
4089 185036 : send_time = pq_getmsgint64(&s);
4090 :
4091 185036 : if (last_received < start_lsn)
4092 146636 : last_received = start_lsn;
4093 :
4094 185036 : if (last_received < end_lsn)
4095 0 : last_received = end_lsn;
4096 :
4097 185036 : UpdateWorkerStats(last_received, send_time, false);
4098 :
4099 185036 : apply_dispatch(&s);
4100 :
4101 184978 : maybe_advance_nonremovable_xid(&rdt_data, false);
4102 : }
4103 5881 : else if (c == PqReplMsg_Keepalive)
4104 : {
4105 : XLogRecPtr end_lsn;
4106 : TimestampTz timestamp;
4107 : bool reply_requested;
4108 :
4109 1804 : end_lsn = pq_getmsgint64(&s);
4110 1804 : timestamp = pq_getmsgint64(&s);
4111 1804 : reply_requested = pq_getmsgbyte(&s);
4112 :
4113 1804 : if (last_received < end_lsn)
4114 1007 : last_received = end_lsn;
4115 :
4116 1804 : send_feedback(last_received, reply_requested, false);
4117 :
4118 1804 : maybe_advance_nonremovable_xid(&rdt_data, false);
4119 :
4120 1804 : UpdateWorkerStats(last_received, timestamp, true);
4121 : }
4122 4077 : else if (c == PqReplMsg_PrimaryStatusUpdate)
4123 : {
4124 4077 : rdt_data.remote_lsn = pq_getmsgint64(&s);
4125 4077 : rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
4126 4077 : rdt_data.remote_nextxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
4127 4077 : rdt_data.reply_time = pq_getmsgint64(&s);
4128 :
4129 : /*
4130 : * This should never happen, see
4131 : * ProcessStandbyPSRequestMessage. But if it happens
4132 : * due to a bug, we don't want to proceed as it can
4133 : * incorrectly advance oldest_nonremovable_xid.
4134 : */
4135 4077 : if (!XLogRecPtrIsValid(rdt_data.remote_lsn))
4136 0 : elog(ERROR, "cannot get the latest WAL position from the publisher");
4137 :
4138 4077 : maybe_advance_nonremovable_xid(&rdt_data, true);
4139 :
4140 4077 : UpdateWorkerStats(last_received, rdt_data.reply_time, false);
4141 : }
4142 : /* other message types are purposefully ignored */
4143 :
4144 190859 : MemoryContextReset(ApplyMessageContext);
4145 : }
4146 :
4147 190859 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
4148 : }
4149 : }
4150 :
4151 : /* confirm all writes so far */
4152 26897 : send_feedback(last_received, false, false);
4153 :
4154 : /* Reset the timestamp if no message was received */
4155 26897 : rdt_data.last_recv_time = 0;
4156 :
4157 26897 : maybe_advance_nonremovable_xid(&rdt_data, false);
4158 :
4159 26896 : if (!in_remote_transaction && !in_streamed_transaction)
4160 : {
4161 : /*
4162 : * If we didn't get any transactions for a while there might be
4163 : * unconsumed invalidation messages in the queue, consume them
4164 : * now.
4165 : */
4166 3408 : AcceptInvalidationMessages();
4167 3408 : maybe_reread_subscription();
4168 :
4169 : /*
4170 : * Process any relations that are being synchronized in parallel
4171 : * and any newly added tables or sequences.
4172 : */
4173 3367 : ProcessSyncingRelations(last_received);
4174 : }
4175 :
4176 : /* Cleanup the memory. */
4177 26654 : MemoryContextReset(ApplyMessageContext);
4178 26654 : MemoryContextSwitchTo(TopMemoryContext);
4179 :
4180 : /* Check if we need to exit the streaming loop. */
4181 26654 : if (endofstream)
4182 10 : break;
4183 :
4184 : /*
4185 : * Wait for more data or latch. If we have unflushed transactions,
4186 : * wake up after WalWriterDelay to see if they've been flushed yet (in
4187 : * which case we should send a feedback message). Otherwise, there's
4188 : * no particular urgency about waking up unless we get data or a
4189 : * signal.
4190 : */
4191 26644 : if (!dlist_is_empty(&lsn_mapping))
4192 2531 : wait_time = WalWriterDelay;
4193 : else
4194 24113 : wait_time = NAPTIME_PER_CYCLE;
4195 :
4196 : /*
4197 : * Ensure to wake up when it's possible to advance the non-removable
4198 : * transaction ID, or when the retention duration may have exceeded
4199 : * max_retention_duration.
4200 : */
4201 26644 : if (MySubscription->retentionactive)
4202 : {
4203 720 : if (rdt_data.phase == RDT_GET_CANDIDATE_XID &&
4204 68 : rdt_data.xid_advance_interval)
4205 0 : wait_time = Min(wait_time, rdt_data.xid_advance_interval);
4206 720 : else if (MySubscription->maxretention > 0)
4207 33 : wait_time = Min(wait_time, MySubscription->maxretention);
4208 : }
4209 :
4210 26644 : rc = WaitLatchOrSocket(MyLatch,
4211 : WL_SOCKET_READABLE | WL_LATCH_SET |
4212 : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
4213 : fd, wait_time,
4214 : WAIT_EVENT_LOGICAL_APPLY_MAIN);
4215 :
4216 26644 : if (rc & WL_LATCH_SET)
4217 : {
4218 737 : ResetLatch(MyLatch);
4219 737 : CHECK_FOR_INTERRUPTS();
4220 : }
4221 :
4222 26544 : if (ConfigReloadPending)
4223 : {
4224 10 : ConfigReloadPending = false;
4225 10 : ProcessConfigFile(PGC_SIGHUP);
4226 : }
4227 :
4228 26544 : if (rc & WL_TIMEOUT)
4229 : {
4230 : /*
4231 : * We didn't receive anything new. If we haven't heard anything
4232 : * from the server for more than wal_receiver_timeout / 2, ping
4233 : * the server. Also, if it's been longer than
4234 : * wal_receiver_status_interval since the last update we sent,
4235 : * send a status update to the primary anyway, to report any
4236 : * progress in applying WAL.
4237 : */
4238 338 : bool requestReply = false;
4239 :
4240 : /*
4241 : * Check if time since last receive from primary has reached the
4242 : * configured limit.
4243 : */
4244 338 : if (wal_receiver_timeout > 0)
4245 : {
4246 338 : TimestampTz now = GetCurrentTimestamp();
4247 : TimestampTz timeout;
4248 :
4249 338 : timeout =
4250 338 : TimestampTzPlusMilliseconds(last_recv_timestamp,
4251 : wal_receiver_timeout);
4252 :
4253 338 : if (now >= timeout)
4254 0 : ereport(ERROR,
4255 : (errcode(ERRCODE_CONNECTION_FAILURE),
4256 : errmsg("terminating logical replication worker due to timeout")));
4257 :
4258 : /* Check to see if it's time for a ping. */
4259 338 : if (!ping_sent)
4260 : {
4261 338 : timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
4262 : (wal_receiver_timeout / 2));
4263 338 : if (now >= timeout)
4264 : {
4265 0 : requestReply = true;
4266 0 : ping_sent = true;
4267 : }
4268 : }
4269 : }
4270 :
4271 338 : send_feedback(last_received, requestReply, requestReply);
4272 :
4273 338 : maybe_advance_nonremovable_xid(&rdt_data, false);
4274 :
4275 : /*
4276 : * Force reporting to ensure long idle periods don't lead to
4277 : * arbitrarily delayed stats. Stats can only be reported outside
4278 : * of (implicit or explicit) transactions. That shouldn't lead to
4279 : * stats being delayed for long, because transactions are either
4280 : * sent as a whole on commit or streamed. Streamed transactions
4281 : * are spilled to disk and applied on commit.
4282 : */
4283 338 : if (!IsTransactionState())
4284 338 : pgstat_report_stat(true);
4285 : }
4286 : }
4287 :
4288 : /* Pop the error context stack */
4289 10 : error_context_stack = errcallback.previous;
4290 10 : apply_error_context_stack = error_context_stack;
4291 :
4292 : /* All done */
4293 10 : walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
4294 0 : }
4295 :
4296 : /*
4297 : * Send a Standby Status Update message to server.
4298 : *
4299 : * 'recvpos' is the latest LSN we've received data to, force is set if we need
4300 : * to send a response to avoid timeouts.
4301 : */
4302 : static void
4303 29039 : send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
4304 : {
4305 : static StringInfo reply_message = NULL;
4306 : static TimestampTz send_time = 0;
4307 :
4308 : static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
4309 : static XLogRecPtr last_writepos = InvalidXLogRecPtr;
4310 :
4311 : XLogRecPtr writepos;
4312 : XLogRecPtr flushpos;
4313 : TimestampTz now;
4314 : bool have_pending_txes;
4315 :
4316 : /*
4317 : * If the user doesn't want status to be reported to the publisher, be
4318 : * sure to exit before doing anything at all.
4319 : */
4320 29039 : if (!force && wal_receiver_status_interval <= 0)
4321 10250 : return;
4322 :
4323 : /* It's legal to not pass a recvpos */
4324 29039 : if (recvpos < last_recvpos)
4325 0 : recvpos = last_recvpos;
4326 :
4327 29039 : get_flush_position(&writepos, &flushpos, &have_pending_txes);
4328 :
4329 : /*
4330 : * No outstanding transactions to flush, we can report the latest received
4331 : * position. This is important for synchronous replication.
4332 : */
4333 29039 : if (!have_pending_txes)
4334 26032 : flushpos = writepos = recvpos;
4335 :
4336 29039 : if (writepos < last_writepos)
4337 0 : writepos = last_writepos;
4338 :
4339 29039 : if (flushpos < last_flushpos)
4340 2976 : flushpos = last_flushpos;
4341 :
4342 29039 : now = GetCurrentTimestamp();
4343 :
4344 : /* if we've already reported everything we're good */
4345 29039 : if (!force &&
4346 29037 : writepos == last_writepos &&
4347 10524 : flushpos == last_flushpos &&
4348 10357 : !TimestampDifferenceExceeds(send_time, now,
4349 : wal_receiver_status_interval * 1000))
4350 10250 : return;
4351 18789 : send_time = now;
4352 :
4353 18789 : if (!reply_message)
4354 : {
4355 430 : MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
4356 :
4357 430 : reply_message = makeStringInfo();
4358 430 : MemoryContextSwitchTo(oldctx);
4359 : }
4360 : else
4361 18359 : resetStringInfo(reply_message);
4362 :
4363 18789 : pq_sendbyte(reply_message, PqReplMsg_StandbyStatusUpdate);
4364 18789 : pq_sendint64(reply_message, recvpos); /* write */
4365 18789 : pq_sendint64(reply_message, flushpos); /* flush */
4366 18789 : pq_sendint64(reply_message, writepos); /* apply */
4367 18789 : pq_sendint64(reply_message, now); /* sendTime */
4368 18789 : pq_sendbyte(reply_message, requestReply); /* replyRequested */
4369 :
4370 18789 : elog(DEBUG2, "sending feedback (force %d) to recv %X/%08X, write %X/%08X, flush %X/%08X",
4371 : force,
4372 : LSN_FORMAT_ARGS(recvpos),
4373 : LSN_FORMAT_ARGS(writepos),
4374 : LSN_FORMAT_ARGS(flushpos));
4375 :
4376 18789 : walrcv_send(LogRepWorkerWalRcvConn,
4377 : reply_message->data, reply_message->len);
4378 :
4379 18789 : if (recvpos > last_recvpos)
4380 18510 : last_recvpos = recvpos;
4381 18789 : if (writepos > last_writepos)
4382 18515 : last_writepos = writepos;
4383 18789 : if (flushpos > last_flushpos)
4384 18281 : last_flushpos = flushpos;
4385 : }
4386 :
4387 : /*
4388 : * Attempt to advance the non-removable transaction ID.
4389 : *
4390 : * See comments atop worker.c for details.
4391 : */
4392 : static void
4393 218094 : maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
4394 : bool status_received)
4395 : {
4396 218094 : if (!can_advance_nonremovable_xid(rdt_data))
4397 213076 : return;
4398 :
4399 5018 : process_rdt_phase_transition(rdt_data, status_received);
4400 : }
4401 :
4402 : /*
4403 : * Preliminary check to determine if advancing the non-removable transaction ID
4404 : * is allowed.
4405 : */
4406 : static bool
4407 218094 : can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
4408 : {
4409 : /*
4410 : * It is sufficient to manage non-removable transaction ID for a
4411 : * subscription by the main apply worker to detect update_deleted reliably
4412 : * even for table sync or parallel apply workers.
4413 : */
4414 218094 : if (!am_leader_apply_worker())
4415 365 : return false;
4416 :
4417 : /* No need to advance if retaining dead tuples is not required */
4418 217729 : if (!MySubscription->retaindeadtuples)
4419 212711 : return false;
4420 :
4421 5018 : return true;
4422 : }
4423 :
4424 : /*
4425 : * Process phase transitions during the non-removable transaction ID
4426 : * advancement. See comments atop worker.c for details of the transition.
4427 : */
4428 : static void
4429 9171 : process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
4430 : bool status_received)
4431 : {
4432 9171 : switch (rdt_data->phase)
4433 : {
4434 183 : case RDT_GET_CANDIDATE_XID:
4435 183 : get_candidate_xid(rdt_data);
4436 183 : break;
4437 4083 : case RDT_REQUEST_PUBLISHER_STATUS:
4438 4083 : request_publisher_status(rdt_data);
4439 4083 : break;
4440 4797 : case RDT_WAIT_FOR_PUBLISHER_STATUS:
4441 4797 : wait_for_publisher_status(rdt_data, status_received);
4442 4797 : break;
4443 106 : case RDT_WAIT_FOR_LOCAL_FLUSH:
4444 106 : wait_for_local_flush(rdt_data);
4445 106 : break;
4446 1 : case RDT_STOP_CONFLICT_INFO_RETENTION:
4447 1 : stop_conflict_info_retention(rdt_data);
4448 1 : break;
4449 1 : case RDT_RESUME_CONFLICT_INFO_RETENTION:
4450 1 : resume_conflict_info_retention(rdt_data);
4451 0 : break;
4452 : }
4453 9170 : }
4454 :
4455 : /*
4456 : * Workhorse for the RDT_GET_CANDIDATE_XID phase.
4457 : */
4458 : static void
4459 183 : get_candidate_xid(RetainDeadTuplesData *rdt_data)
4460 : {
4461 : TransactionId oldest_running_xid;
4462 : TimestampTz now;
4463 :
4464 : /*
4465 : * Use last_recv_time when applying changes in the loop to avoid
4466 : * unnecessary system time retrieval. If last_recv_time is not available,
4467 : * obtain the current timestamp.
4468 : */
4469 183 : now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4470 :
4471 : /*
4472 : * Compute the candidate_xid and request the publisher status at most once
4473 : * per xid_advance_interval. Refer to adjust_xid_advance_interval() for
4474 : * details on how this value is dynamically adjusted. This is to avoid
4475 : * using CPU and network resources without making much progress.
4476 : */
4477 183 : if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
4478 : rdt_data->xid_advance_interval))
4479 0 : return;
4480 :
4481 : /*
4482 : * Immediately update the timer, even if the function returns later
4483 : * without setting candidate_xid due to inactivity on the subscriber. This
4484 : * avoids frequent calls to GetOldestActiveTransactionId.
4485 : */
4486 183 : rdt_data->candidate_xid_time = now;
4487 :
4488 : /*
4489 : * Consider transactions in the current database, as only dead tuples from
4490 : * this database are required for conflict detection.
4491 : */
4492 183 : oldest_running_xid = GetOldestActiveTransactionId(false, false);
4493 :
4494 : /*
4495 : * Oldest active transaction ID (oldest_running_xid) can't be behind any
4496 : * of its previously computed value.
4497 : */
4498 : Assert(TransactionIdPrecedesOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
4499 : oldest_running_xid));
4500 :
4501 : /* Return if the oldest_nonremovable_xid cannot be advanced */
4502 183 : if (TransactionIdEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
4503 : oldest_running_xid))
4504 : {
4505 140 : adjust_xid_advance_interval(rdt_data, false);
4506 140 : return;
4507 : }
4508 :
4509 43 : adjust_xid_advance_interval(rdt_data, true);
4510 :
4511 43 : rdt_data->candidate_xid = oldest_running_xid;
4512 43 : rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
4513 :
4514 : /* process the next phase */
4515 43 : process_rdt_phase_transition(rdt_data, false);
4516 : }
4517 :
4518 : /*
4519 : * Workhorse for the RDT_REQUEST_PUBLISHER_STATUS phase.
4520 : */
4521 : static void
4522 4083 : request_publisher_status(RetainDeadTuplesData *rdt_data)
4523 : {
4524 : static StringInfo request_message = NULL;
4525 :
4526 4083 : if (!request_message)
4527 : {
4528 12 : MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
4529 :
4530 12 : request_message = makeStringInfo();
4531 12 : MemoryContextSwitchTo(oldctx);
4532 : }
4533 : else
4534 4071 : resetStringInfo(request_message);
4535 :
4536 : /*
4537 : * Send the current time to update the remote walsender's latest reply
4538 : * message received time.
4539 : */
4540 4083 : pq_sendbyte(request_message, PqReplMsg_PrimaryStatusRequest);
4541 4083 : pq_sendint64(request_message, GetCurrentTimestamp());
4542 :
4543 4083 : elog(DEBUG2, "sending publisher status request message");
4544 :
4545 : /* Send a request for the publisher status */
4546 4083 : walrcv_send(LogRepWorkerWalRcvConn,
4547 : request_message->data, request_message->len);
4548 :
4549 4083 : rdt_data->phase = RDT_WAIT_FOR_PUBLISHER_STATUS;
4550 :
4551 : /*
4552 : * Skip calling maybe_advance_nonremovable_xid() since further transition
4553 : * is possible only once we receive the publisher status message.
4554 : */
4555 4083 : }
4556 :
4557 : /*
4558 : * Workhorse for the RDT_WAIT_FOR_PUBLISHER_STATUS phase.
4559 : */
4560 : static void
4561 4797 : wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
4562 : bool status_received)
4563 : {
4564 : /*
4565 : * Return if we have requested but not yet received the publisher status.
4566 : */
4567 4797 : if (!status_received)
4568 720 : return;
4569 :
4570 : /*
4571 : * We don't need to maintain oldest_nonremovable_xid if we decide to stop
4572 : * retaining conflict information for this worker.
4573 : */
4574 4077 : if (should_stop_conflict_info_retention(rdt_data))
4575 : {
4576 1 : rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
4577 1 : return;
4578 : }
4579 :
4580 4076 : if (!FullTransactionIdIsValid(rdt_data->remote_wait_for))
4581 36 : rdt_data->remote_wait_for = rdt_data->remote_nextxid;
4582 :
4583 : /*
4584 : * Check if all remote concurrent transactions that were active at the
4585 : * first status request have now completed. If completed, proceed to the
4586 : * next phase; otherwise, continue checking the publisher status until
4587 : * these transactions finish.
4588 : *
4589 : * It's possible that transactions in the commit phase during the last
4590 : * cycle have now finished committing, but remote_oldestxid remains older
4591 : * than remote_wait_for. This can happen if some old transaction came in
4592 : * the commit phase when we requested status in this cycle. We do not
4593 : * handle this case explicitly as it's rare and the benefit doesn't
4594 : * justify the required complexity. Tracking would require either caching
4595 : * all xids at the publisher or sending them to subscribers. The condition
4596 : * will resolve naturally once the remaining transactions are finished.
4597 : *
4598 : * Directly advancing the non-removable transaction ID is possible if
4599 : * there are no activities on the publisher since the last advancement
4600 : * cycle. However, it requires maintaining two fields, last_remote_nextxid
4601 : * and last_remote_lsn, within the structure for comparison with the
4602 : * current cycle's values. Considering the minimal cost of continuing in
4603 : * RDT_WAIT_FOR_LOCAL_FLUSH without awaiting changes, we opted not to
4604 : * advance the transaction ID here.
4605 : */
4606 4076 : if (FullTransactionIdPrecedesOrEquals(rdt_data->remote_wait_for,
4607 : rdt_data->remote_oldestxid))
4608 36 : rdt_data->phase = RDT_WAIT_FOR_LOCAL_FLUSH;
4609 : else
4610 4040 : rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
4611 :
4612 : /* process the next phase */
4613 4076 : process_rdt_phase_transition(rdt_data, false);
4614 : }
4615 :
4616 : /*
4617 : * Workhorse for the RDT_WAIT_FOR_LOCAL_FLUSH phase.
4618 : */
4619 : static void
4620 106 : wait_for_local_flush(RetainDeadTuplesData *rdt_data)
4621 : {
4622 : Assert(XLogRecPtrIsValid(rdt_data->remote_lsn) &&
4623 : TransactionIdIsValid(rdt_data->candidate_xid));
4624 :
4625 : /*
4626 : * We expect the publisher and subscriber clocks to be in sync using time
4627 : * sync service like NTP. Otherwise, we will advance this worker's
4628 : * oldest_nonremovable_xid prematurely, leading to the removal of rows
4629 : * required to detect update_deleted reliably. This check primarily
4630 : * addresses scenarios where the publisher's clock falls behind; if the
4631 : * publisher's clock is ahead, subsequent transactions will naturally bear
4632 : * later commit timestamps, conforming to the design outlined atop
4633 : * worker.c.
4634 : *
4635 : * XXX Consider waiting for the publisher's clock to catch up with the
4636 : * subscriber's before proceeding to the next phase.
4637 : */
4638 106 : if (TimestampDifferenceExceeds(rdt_data->reply_time,
4639 : rdt_data->candidate_xid_time, 0))
4640 0 : ereport(ERROR,
4641 : errmsg_internal("oldest_nonremovable_xid transaction ID could be advanced prematurely"),
4642 : errdetail_internal("The clock on the publisher is behind that of the subscriber."));
4643 :
4644 : /*
4645 : * Do not attempt to advance the non-removable transaction ID when table
4646 : * sync is in progress. During this time, changes from a single
4647 : * transaction may be applied by multiple table sync workers corresponding
4648 : * to the target tables. So, it's necessary for all table sync workers to
4649 : * apply and flush the corresponding changes before advancing the
4650 : * transaction ID, otherwise, dead tuples that are still needed for
4651 : * conflict detection in table sync workers could be removed prematurely.
4652 : * However, confirming the apply and flush progress across all table sync
4653 : * workers is complex and not worth the effort, so we simply return if not
4654 : * all tables are in the READY state.
4655 : *
4656 : * Advancing the transaction ID is necessary even when no tables are
4657 : * currently subscribed, to avoid retaining dead tuples unnecessarily.
4658 : * While it might seem safe to skip all phases and directly assign
4659 : * candidate_xid to oldest_nonremovable_xid during the
4660 : * RDT_GET_CANDIDATE_XID phase in such cases, this is unsafe. If users
4661 : * concurrently add tables to the subscription, the apply worker may not
4662 : * process invalidations in time. Consequently,
4663 : * HasSubscriptionTablesCached() might miss the new tables, leading to
4664 : * premature advancement of oldest_nonremovable_xid.
4665 : *
4666 : * Performing the check during RDT_WAIT_FOR_LOCAL_FLUSH is safe, as
4667 : * invalidations are guaranteed to be processed before applying changes
4668 : * from newly added tables while waiting for the local flush to reach
4669 : * remote_lsn.
4670 : *
4671 : * Additionally, even if we check for subscription tables during
4672 : * RDT_GET_CANDIDATE_XID, they might be dropped before reaching
4673 : * RDT_WAIT_FOR_LOCAL_FLUSH. Therefore, it's still necessary to verify
4674 : * subscription tables at this stage to prevent unnecessary tuple
4675 : * retention.
4676 : */
4677 106 : if (HasSubscriptionTablesCached() && !AllTablesyncsReady())
4678 : {
4679 : TimestampTz now;
4680 :
4681 36 : now = rdt_data->last_recv_time
4682 18 : ? rdt_data->last_recv_time : GetCurrentTimestamp();
4683 :
4684 : /*
4685 : * Record the time spent waiting for table sync, it is needed for the
4686 : * timeout check in should_stop_conflict_info_retention().
4687 : */
4688 18 : rdt_data->table_sync_wait_time =
4689 18 : TimestampDifferenceMilliseconds(rdt_data->candidate_xid_time, now);
4690 :
4691 18 : return;
4692 : }
4693 :
4694 : /*
4695 : * We don't need to maintain oldest_nonremovable_xid if we decide to stop
4696 : * retaining conflict information for this worker.
4697 : */
4698 88 : if (should_stop_conflict_info_retention(rdt_data))
4699 : {
4700 0 : rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
4701 0 : return;
4702 : }
4703 :
4704 : /*
4705 : * Update and check the remote flush position if we are applying changes
4706 : * in a loop. This is done at most once per WalWriterDelay to avoid
4707 : * performing costly operations in get_flush_position() too frequently
4708 : * during change application.
4709 : */
4710 121 : if (last_flushpos < rdt_data->remote_lsn && rdt_data->last_recv_time &&
4711 33 : TimestampDifferenceExceeds(rdt_data->flushpos_update_time,
4712 : rdt_data->last_recv_time, WalWriterDelay))
4713 : {
4714 : XLogRecPtr writepos;
4715 : XLogRecPtr flushpos;
4716 : bool have_pending_txes;
4717 :
4718 : /* Fetch the latest remote flush position */
4719 12 : get_flush_position(&writepos, &flushpos, &have_pending_txes);
4720 :
4721 12 : if (flushpos > last_flushpos)
4722 0 : last_flushpos = flushpos;
4723 :
4724 12 : rdt_data->flushpos_update_time = rdt_data->last_recv_time;
4725 : }
4726 :
4727 : /* Return to wait for the changes to be applied */
4728 88 : if (last_flushpos < rdt_data->remote_lsn)
4729 53 : return;
4730 :
4731 : /*
4732 : * Reaching this point implies should_stop_conflict_info_retention()
4733 : * returned false earlier, meaning that the most recent duration for
4734 : * advancing the non-removable transaction ID is within the
4735 : * max_retention_duration or max_retention_duration is set to 0.
4736 : *
4737 : * Therefore, if conflict info retention was previously stopped due to a
4738 : * timeout, it is now safe to resume retention.
4739 : */
4740 35 : if (!MySubscription->retentionactive)
4741 : {
4742 1 : rdt_data->phase = RDT_RESUME_CONFLICT_INFO_RETENTION;
4743 1 : return;
4744 : }
4745 :
4746 : /*
4747 : * Reaching here means the remote WAL position has been received, and all
4748 : * transactions up to that position on the publisher have been applied and
4749 : * flushed locally. So, we can advance the non-removable transaction ID.
4750 : */
4751 34 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
4752 34 : MyLogicalRepWorker->oldest_nonremovable_xid = rdt_data->candidate_xid;
4753 34 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
4754 :
4755 34 : elog(DEBUG2, "confirmed flush up to remote lsn %X/%08X: new oldest_nonremovable_xid %u",
4756 : LSN_FORMAT_ARGS(rdt_data->remote_lsn),
4757 : rdt_data->candidate_xid);
4758 :
4759 : /* Notify launcher to update the xmin of the conflict slot */
4760 34 : ApplyLauncherWakeup();
4761 :
4762 34 : reset_retention_data_fields(rdt_data);
4763 :
4764 : /* process the next phase */
4765 34 : process_rdt_phase_transition(rdt_data, false);
4766 : }
4767 :
4768 : /*
4769 : * Check whether conflict information retention should be stopped due to
4770 : * exceeding the maximum wait time (max_retention_duration).
4771 : *
4772 : * If retention should be stopped, return true. Otherwise, return false.
4773 : */
4774 : static bool
4775 4165 : should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
4776 : {
4777 : TimestampTz now;
4778 :
4779 : Assert(TransactionIdIsValid(rdt_data->candidate_xid));
4780 : Assert(rdt_data->phase == RDT_WAIT_FOR_PUBLISHER_STATUS ||
4781 : rdt_data->phase == RDT_WAIT_FOR_LOCAL_FLUSH);
4782 :
4783 4165 : if (!MySubscription->maxretention)
4784 4164 : return false;
4785 :
4786 : /*
4787 : * Use last_recv_time when applying changes in the loop to avoid
4788 : * unnecessary system time retrieval. If last_recv_time is not available,
4789 : * obtain the current timestamp.
4790 : */
4791 1 : now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4792 :
4793 : /*
4794 : * Return early if the wait time has not exceeded the configured maximum
4795 : * (max_retention_duration). Time spent waiting for table synchronization
4796 : * is excluded from this calculation, as it occurs infrequently.
4797 : */
4798 1 : if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
4799 1 : MySubscription->maxretention +
4800 1 : rdt_data->table_sync_wait_time))
4801 0 : return false;
4802 :
4803 1 : return true;
4804 : }
4805 :
4806 : /*
4807 : * Workhorse for the RDT_STOP_CONFLICT_INFO_RETENTION phase.
4808 : */
4809 : static void
4810 1 : stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
4811 : {
4812 : /* Stop retention if not yet */
4813 1 : if (MySubscription->retentionactive)
4814 : {
4815 : /*
4816 : * If the retention status cannot be updated (e.g., due to active
4817 : * transaction), skip further processing to avoid inconsistent
4818 : * retention behavior.
4819 : */
4820 1 : if (!update_retention_status(false))
4821 0 : return;
4822 :
4823 1 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
4824 1 : MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId;
4825 1 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
4826 :
4827 1 : ereport(LOG,
4828 : errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts",
4829 : MySubscription->name),
4830 : errdetail("Retention is stopped because the apply process has not caught up with the publisher within the configured max_retention_duration."));
4831 : }
4832 :
4833 : Assert(!TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid));
4834 :
4835 : /*
4836 : * If retention has been stopped, reset to the initial phase to retry
4837 : * resuming retention. This reset is required to recalculate the current
4838 : * wait time and resume retention if the time falls within
4839 : * max_retention_duration.
4840 : */
4841 1 : reset_retention_data_fields(rdt_data);
4842 : }
4843 :
4844 : /*
4845 : * Workhorse for the RDT_RESUME_CONFLICT_INFO_RETENTION phase.
4846 : */
4847 : static void
4848 1 : resume_conflict_info_retention(RetainDeadTuplesData *rdt_data)
4849 : {
4850 : /* We can't resume retention without updating retention status. */
4851 1 : if (!update_retention_status(true))
4852 0 : return;
4853 :
4854 1 : ereport(LOG,
4855 : errmsg("logical replication worker for subscription \"%s\" will resume retaining the information for detecting conflicts",
4856 : MySubscription->name),
4857 : MySubscription->maxretention
4858 : ? errdetail("Retention is re-enabled because the apply process has caught up with the publisher within the configured max_retention_duration.")
4859 : : errdetail("Retention is re-enabled because max_retention_duration has been set to unlimited."));
4860 :
4861 : /*
4862 : * Restart the worker to let the launcher initialize
4863 : * oldest_nonremovable_xid at startup.
4864 : *
4865 : * While it's technically possible to derive this value on-the-fly using
4866 : * the conflict detection slot's xmin, doing so risks a race condition:
4867 : * the launcher might clean slot.xmin just after retention resumes. This
4868 : * would make oldest_nonremovable_xid unreliable, especially during xid
4869 : * wraparound.
4870 : *
4871 : * Although this can be prevented by introducing heavy weight locking, the
4872 : * complexity it will bring doesn't seem worthwhile given how rarely
4873 : * retention is resumed.
4874 : */
4875 1 : apply_worker_exit();
4876 : }
4877 :
4878 : /*
4879 : * Updates pg_subscription.subretentionactive to the given value within a
4880 : * new transaction.
4881 : *
4882 : * If already inside an active transaction, skips the update and returns
4883 : * false.
4884 : *
4885 : * Returns true if the update is successfully performed.
4886 : */
4887 : static bool
4888 2 : update_retention_status(bool active)
4889 : {
4890 : /*
4891 : * Do not update the catalog during an active transaction. The transaction
4892 : * may be started during change application, leading to a possible
4893 : * rollback of catalog updates if the application fails subsequently.
4894 : */
4895 2 : if (IsTransactionState())
4896 0 : return false;
4897 :
4898 2 : StartTransactionCommand();
4899 :
4900 : /*
4901 : * Updating pg_subscription might involve TOAST table access, so ensure we
4902 : * have a valid snapshot.
4903 : */
4904 2 : PushActiveSnapshot(GetTransactionSnapshot());
4905 :
4906 : /* Update pg_subscription.subretentionactive */
4907 2 : UpdateDeadTupleRetentionStatus(MySubscription->oid, active);
4908 :
4909 2 : PopActiveSnapshot();
4910 2 : CommitTransactionCommand();
4911 :
4912 : /* Notify launcher to update the conflict slot */
4913 2 : ApplyLauncherWakeup();
4914 :
4915 2 : MySubscription->retentionactive = active;
4916 :
4917 2 : return true;
4918 : }
4919 :
4920 : /*
4921 : * Reset all data fields of RetainDeadTuplesData except those used to
4922 : * determine the timing for the next round of transaction ID advancement. We
4923 : * can even use flushpos_update_time in the next round to decide whether to get
4924 : * the latest flush position.
4925 : */
4926 : static void
4927 35 : reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
4928 : {
4929 35 : rdt_data->phase = RDT_GET_CANDIDATE_XID;
4930 35 : rdt_data->remote_lsn = InvalidXLogRecPtr;
4931 35 : rdt_data->remote_oldestxid = InvalidFullTransactionId;
4932 35 : rdt_data->remote_nextxid = InvalidFullTransactionId;
4933 35 : rdt_data->reply_time = 0;
4934 35 : rdt_data->remote_wait_for = InvalidFullTransactionId;
4935 35 : rdt_data->candidate_xid = InvalidTransactionId;
4936 35 : rdt_data->table_sync_wait_time = 0;
4937 35 : }
4938 :
4939 : /*
4940 : * Adjust the interval for advancing non-removable transaction IDs.
4941 : *
4942 : * If there is no activity on the node or retention has been stopped, we
4943 : * progressively double the interval used to advance non-removable transaction
4944 : * ID. This helps conserve CPU and network resources when there's little benefit
4945 : * to frequent updates.
4946 : *
4947 : * The interval is capped by the lowest of the following:
4948 : * - wal_receiver_status_interval (if set and retention is active),
4949 : * - a default maximum of 3 minutes,
4950 : * - max_retention_duration (if retention is active).
4951 : *
4952 : * This ensures the interval never exceeds the retention boundary, even if other
4953 : * limits are higher. Once activity resumes on the node and the retention is
4954 : * active, the interval is reset to lesser of 100ms and max_retention_duration,
4955 : * allowing timely advancement of non-removable transaction ID.
4956 : *
4957 : * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
4958 : * consider the other interval or a separate GUC if the need arises.
4959 : */
4960 : static void
4961 183 : adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
4962 : {
4963 183 : if (rdt_data->xid_advance_interval && !new_xid_found)
4964 0 : {
4965 0 : int max_interval = wal_receiver_status_interval
4966 0 : ? wal_receiver_status_interval * 1000
4967 0 : : MAX_XID_ADVANCE_INTERVAL;
4968 :
4969 : /*
4970 : * No new transaction ID has been assigned since the last check, so
4971 : * double the interval, but not beyond the maximum allowable value.
4972 : */
4973 0 : rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
4974 : max_interval);
4975 : }
4976 183 : else if (rdt_data->xid_advance_interval &&
4977 0 : !MySubscription->retentionactive)
4978 : {
4979 : /*
4980 : * Retention has been stopped, so double the interval-capped at a
4981 : * maximum of 3 minutes. The wal_receiver_status_interval is
4982 : * intentionally not used as a upper bound, since the likelihood of
4983 : * retention resuming is lower than that of general activity resuming.
4984 : */
4985 0 : rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
4986 : MAX_XID_ADVANCE_INTERVAL);
4987 : }
4988 : else
4989 : {
4990 : /*
4991 : * A new transaction ID was found or the interval is not yet
4992 : * initialized, so set the interval to the minimum value.
4993 : */
4994 183 : rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL;
4995 : }
4996 :
4997 : /*
4998 : * Ensure the wait time remains within the maximum retention time limit
4999 : * when retention is active.
5000 : */
5001 183 : if (MySubscription->retentionactive)
5002 182 : rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval,
5003 : MySubscription->maxretention);
5004 183 : }
5005 :
5006 : /*
5007 : * Exit routine for apply workers due to subscription parameter changes.
5008 : */
5009 : static void
5010 45 : apply_worker_exit(void)
5011 : {
5012 45 : if (am_parallel_apply_worker())
5013 : {
5014 : /*
5015 : * Don't stop the parallel apply worker as the leader will detect the
5016 : * subscription parameter change and restart logical replication later
5017 : * anyway. This also prevents the leader from reporting errors when
5018 : * trying to communicate with a stopped parallel apply worker, which
5019 : * would accidentally disable subscriptions if disable_on_error was
5020 : * set.
5021 : */
5022 0 : return;
5023 : }
5024 :
5025 : /*
5026 : * Reset the last-start time for this apply worker so that the launcher
5027 : * will restart it without waiting for wal_retrieve_retry_interval if the
5028 : * subscription is still active, and so that we won't leak that hash table
5029 : * entry if it isn't.
5030 : */
5031 45 : if (am_leader_apply_worker())
5032 45 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
5033 :
5034 45 : proc_exit(0);
5035 : }
5036 :
5037 : /*
5038 : * Reread subscription info if needed.
5039 : *
5040 : * For significant changes, we react by exiting the current process; a new
5041 : * one will be launched afterwards if needed.
5042 : */
5043 : void
5044 4447 : maybe_reread_subscription(void)
5045 : {
5046 : MemoryContext oldctx;
5047 : Subscription *newsub;
5048 4447 : bool started_tx = false;
5049 :
5050 : /* When cache state is valid there is nothing to do here. */
5051 4447 : if (MySubscriptionValid)
5052 4358 : return;
5053 :
5054 : /* This function might be called inside or outside of transaction. */
5055 89 : if (!IsTransactionState())
5056 : {
5057 86 : StartTransactionCommand();
5058 86 : started_tx = true;
5059 : }
5060 :
5061 : /* Ensure allocations in permanent context. */
5062 89 : oldctx = MemoryContextSwitchTo(ApplyContext);
5063 :
5064 89 : newsub = GetSubscription(MyLogicalRepWorker->subid, true, true);
5065 :
5066 : /*
5067 : * Exit if the subscription was removed. This normally should not happen
5068 : * as the worker gets killed during DROP SUBSCRIPTION.
5069 : */
5070 89 : if (!newsub)
5071 : {
5072 0 : ereport(LOG,
5073 : (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
5074 : MySubscription->name)));
5075 :
5076 : /* Ensure we remove no-longer-useful entry for worker's start time */
5077 0 : if (am_leader_apply_worker())
5078 0 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
5079 :
5080 0 : proc_exit(0);
5081 : }
5082 :
5083 : /* Exit if the subscription was disabled. */
5084 89 : if (!newsub->enabled)
5085 : {
5086 13 : ereport(LOG,
5087 : (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
5088 : MySubscription->name)));
5089 :
5090 13 : apply_worker_exit();
5091 : }
5092 :
5093 : /* !slotname should never happen when enabled is true. */
5094 : Assert(newsub->slotname);
5095 :
5096 : /* two-phase cannot be altered while the worker is running */
5097 : Assert(newsub->twophasestate == MySubscription->twophasestate);
5098 :
5099 : /*
5100 : * Exit if any parameter that affects the remote connection was changed.
5101 : * The launcher will start a new worker but note that the parallel apply
5102 : * worker won't restart if the streaming option's value is changed from
5103 : * 'parallel' to any other value or the server decides not to stream the
5104 : * in-progress transaction.
5105 : */
5106 76 : if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
5107 74 : strcmp(newsub->name, MySubscription->name) != 0 ||
5108 73 : strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
5109 73 : newsub->binary != MySubscription->binary ||
5110 67 : newsub->stream != MySubscription->stream ||
5111 62 : newsub->passwordrequired != MySubscription->passwordrequired ||
5112 62 : strcmp(newsub->origin, MySubscription->origin) != 0 ||
5113 60 : newsub->owner != MySubscription->owner ||
5114 59 : !equal(newsub->publications, MySubscription->publications))
5115 : {
5116 27 : if (am_parallel_apply_worker())
5117 0 : ereport(LOG,
5118 : (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
5119 : MySubscription->name)));
5120 : else
5121 27 : ereport(LOG,
5122 : (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
5123 : MySubscription->name)));
5124 :
5125 27 : apply_worker_exit();
5126 : }
5127 :
5128 : /*
5129 : * Exit if the subscription owner's superuser privileges have been
5130 : * revoked.
5131 : */
5132 49 : if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
5133 : {
5134 4 : if (am_parallel_apply_worker())
5135 0 : ereport(LOG,
5136 : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
5137 : MySubscription->name));
5138 : else
5139 4 : ereport(LOG,
5140 : errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
5141 : MySubscription->name));
5142 :
5143 4 : apply_worker_exit();
5144 : }
5145 :
5146 : /* Check for other changes that should never happen too. */
5147 45 : if (newsub->dbid != MySubscription->dbid)
5148 : {
5149 0 : elog(ERROR, "subscription %u changed unexpectedly",
5150 : MyLogicalRepWorker->subid);
5151 : }
5152 :
5153 : /* Clean old subscription info and switch to new one. */
5154 45 : FreeSubscription(MySubscription);
5155 45 : MySubscription = newsub;
5156 :
5157 45 : MemoryContextSwitchTo(oldctx);
5158 :
5159 : /* Change synchronous commit according to the user's wishes */
5160 45 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
5161 : PGC_BACKEND, PGC_S_OVERRIDE);
5162 :
5163 : /* Change wal_receiver_timeout according to the user's wishes */
5164 45 : set_wal_receiver_timeout();
5165 :
5166 45 : if (started_tx)
5167 45 : CommitTransactionCommand();
5168 :
5169 45 : MySubscriptionValid = true;
5170 : }
5171 :
5172 : /*
5173 : * Change wal_receiver_timeout to MySubscription->walrcvtimeout.
5174 : */
5175 : static void
5176 552 : set_wal_receiver_timeout(void)
5177 : {
5178 : bool parsed;
5179 : int val;
5180 552 : int prev_timeout = wal_receiver_timeout;
5181 :
5182 : /*
5183 : * Set the wal_receiver_timeout GUC to MySubscription->walrcvtimeout,
5184 : * which comes from the subscription's wal_receiver_timeout option. If the
5185 : * value is -1, reset the GUC to its default, meaning it will inherit from
5186 : * the server config, command line, or role/database settings.
5187 : */
5188 552 : parsed = parse_int(MySubscription->walrcvtimeout, &val, 0, NULL);
5189 552 : if (parsed && val == -1)
5190 552 : SetConfigOption("wal_receiver_timeout", NULL,
5191 : PGC_BACKEND, PGC_S_SESSION);
5192 : else
5193 0 : SetConfigOption("wal_receiver_timeout", MySubscription->walrcvtimeout,
5194 : PGC_BACKEND, PGC_S_SESSION);
5195 :
5196 : /*
5197 : * Log the wal_receiver_timeout setting (in milliseconds) as a debug
5198 : * message when it changes, to verify it was set correctly.
5199 : */
5200 552 : if (prev_timeout != wal_receiver_timeout)
5201 0 : elog(DEBUG1, "logical replication worker for subscription \"%s\" wal_receiver_timeout: %d ms",
5202 : MySubscription->name, wal_receiver_timeout);
5203 552 : }
5204 :
5205 : /*
5206 : * Callback from subscription syscache invalidation. Also needed for server or
5207 : * user mapping invalidation, which can change the connection information for
5208 : * subscriptions that connect using a server object.
5209 : */
5210 : static void
5211 93 : subscription_change_cb(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
5212 : {
5213 93 : MySubscriptionValid = false;
5214 93 : }
5215 :
5216 : /*
5217 : * subxact_info_write
5218 : * Store information about subxacts for a toplevel transaction.
5219 : *
5220 : * For each subxact we store offset of its first change in the main file.
5221 : * The file is always over-written as a whole.
5222 : *
5223 : * XXX We should only store subxacts that were not aborted yet.
5224 : */
5225 : static void
5226 372 : subxact_info_write(Oid subid, TransactionId xid)
5227 : {
5228 : char path[MAXPGPATH];
5229 : Size len;
5230 : BufFile *fd;
5231 :
5232 : Assert(TransactionIdIsValid(xid));
5233 :
5234 : /* construct the subxact filename */
5235 372 : subxact_filename(path, subid, xid);
5236 :
5237 : /* Delete the subxacts file, if exists. */
5238 372 : if (subxact_data.nsubxacts == 0)
5239 : {
5240 290 : cleanup_subxact_info();
5241 290 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
5242 :
5243 290 : return;
5244 : }
5245 :
5246 : /*
5247 : * Create the subxact file if it not already created, otherwise open the
5248 : * existing file.
5249 : */
5250 82 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR,
5251 : true);
5252 82 : if (fd == NULL)
5253 8 : fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path);
5254 :
5255 82 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
5256 :
5257 : /* Write the subxact count and subxact info */
5258 82 : BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
5259 82 : BufFileWrite(fd, subxact_data.subxacts, len);
5260 :
5261 82 : BufFileClose(fd);
5262 :
5263 : /* free the memory allocated for subxact info */
5264 82 : cleanup_subxact_info();
5265 : }
5266 :
5267 : /*
5268 : * subxact_info_read
5269 : * Restore information about subxacts of a streamed transaction.
5270 : *
5271 : * Read information about subxacts into the structure subxact_data that can be
5272 : * used later.
5273 : */
5274 : static void
5275 344 : subxact_info_read(Oid subid, TransactionId xid)
5276 : {
5277 : char path[MAXPGPATH];
5278 : Size len;
5279 : BufFile *fd;
5280 : MemoryContext oldctx;
5281 :
5282 : Assert(!subxact_data.subxacts);
5283 : Assert(subxact_data.nsubxacts == 0);
5284 : Assert(subxact_data.nsubxacts_max == 0);
5285 :
5286 : /*
5287 : * If the subxact file doesn't exist that means we don't have any subxact
5288 : * info.
5289 : */
5290 344 : subxact_filename(path, subid, xid);
5291 344 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
5292 : true);
5293 344 : if (fd == NULL)
5294 265 : return;
5295 :
5296 : /* read number of subxact items */
5297 79 : BufFileReadExact(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
5298 :
5299 79 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
5300 :
5301 : /* we keep the maximum as a power of 2 */
5302 79 : subxact_data.nsubxacts_max = 1 << pg_ceil_log2_32(subxact_data.nsubxacts);
5303 :
5304 : /*
5305 : * Allocate subxact information in the logical streaming context. We need
5306 : * this information during the complete stream so that we can add the sub
5307 : * transaction info to this. On stream stop we will flush this information
5308 : * to the subxact file and reset the logical streaming context.
5309 : */
5310 79 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
5311 79 : subxact_data.subxacts = palloc_array(SubXactInfo,
5312 : subxact_data.nsubxacts_max);
5313 79 : MemoryContextSwitchTo(oldctx);
5314 :
5315 79 : if (len > 0)
5316 79 : BufFileReadExact(fd, subxact_data.subxacts, len);
5317 :
5318 79 : BufFileClose(fd);
5319 : }
5320 :
5321 : /*
5322 : * subxact_info_add
5323 : * Add information about a subxact (offset in the main file).
5324 : */
5325 : static void
5326 102512 : subxact_info_add(TransactionId xid)
5327 : {
5328 102512 : SubXactInfo *subxacts = subxact_data.subxacts;
5329 : int64 i;
5330 :
5331 : /* We must have a valid top level stream xid and a stream fd. */
5332 : Assert(TransactionIdIsValid(stream_xid));
5333 : Assert(stream_fd != NULL);
5334 :
5335 : /*
5336 : * If the XID matches the toplevel transaction, we don't want to add it.
5337 : */
5338 102512 : if (stream_xid == xid)
5339 92388 : return;
5340 :
5341 : /*
5342 : * In most cases we're checking the same subxact as we've already seen in
5343 : * the last call, so make sure to ignore it (this change comes later).
5344 : */
5345 10124 : if (subxact_data.subxact_last == xid)
5346 10048 : return;
5347 :
5348 : /* OK, remember we're processing this XID. */
5349 76 : subxact_data.subxact_last = xid;
5350 :
5351 : /*
5352 : * Check if the transaction is already present in the array of subxact. We
5353 : * intentionally scan the array from the tail, because we're likely adding
5354 : * a change for the most recent subtransactions.
5355 : *
5356 : * XXX Can we rely on the subxact XIDs arriving in sorted order? That
5357 : * would allow us to use binary search here.
5358 : */
5359 95 : for (i = subxact_data.nsubxacts; i > 0; i--)
5360 : {
5361 : /* found, so we're done */
5362 76 : if (subxacts[i - 1].xid == xid)
5363 57 : return;
5364 : }
5365 :
5366 : /* This is a new subxact, so we need to add it to the array. */
5367 19 : if (subxact_data.nsubxacts == 0)
5368 : {
5369 : MemoryContext oldctx;
5370 :
5371 8 : subxact_data.nsubxacts_max = 128;
5372 :
5373 : /*
5374 : * Allocate this memory for subxacts in per-stream context, see
5375 : * subxact_info_read.
5376 : */
5377 8 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
5378 8 : subxacts = palloc_array(SubXactInfo, subxact_data.nsubxacts_max);
5379 8 : MemoryContextSwitchTo(oldctx);
5380 : }
5381 11 : else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
5382 : {
5383 10 : subxact_data.nsubxacts_max *= 2;
5384 10 : subxacts = repalloc_array(subxacts, SubXactInfo,
5385 : subxact_data.nsubxacts_max);
5386 : }
5387 :
5388 19 : subxacts[subxact_data.nsubxacts].xid = xid;
5389 :
5390 : /*
5391 : * Get the current offset of the stream file and store it as offset of
5392 : * this subxact.
5393 : */
5394 19 : BufFileTell(stream_fd,
5395 19 : &subxacts[subxact_data.nsubxacts].fileno,
5396 19 : &subxacts[subxact_data.nsubxacts].offset);
5397 :
5398 19 : subxact_data.nsubxacts++;
5399 19 : subxact_data.subxacts = subxacts;
5400 : }
5401 :
5402 : /* format filename for file containing the info about subxacts */
5403 : static inline void
5404 747 : subxact_filename(char *path, Oid subid, TransactionId xid)
5405 : {
5406 747 : snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
5407 747 : }
5408 :
5409 : /* format filename for file containing serialized changes */
5410 : static inline void
5411 438 : changes_filename(char *path, Oid subid, TransactionId xid)
5412 : {
5413 438 : snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
5414 438 : }
5415 :
5416 : /*
5417 : * stream_cleanup_files
5418 : * Cleanup files for a subscription / toplevel transaction.
5419 : *
5420 : * Remove files with serialized changes and subxact info for a particular
5421 : * toplevel transaction. Each subscription has a separate set of files
5422 : * for any toplevel transaction.
5423 : */
5424 : void
5425 31 : stream_cleanup_files(Oid subid, TransactionId xid)
5426 : {
5427 : char path[MAXPGPATH];
5428 :
5429 : /* Delete the changes file. */
5430 31 : changes_filename(path, subid, xid);
5431 31 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false);
5432 :
5433 : /* Delete the subxact file, if it exists. */
5434 31 : subxact_filename(path, subid, xid);
5435 31 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
5436 31 : }
5437 :
5438 : /*
5439 : * stream_open_file
5440 : * Open a file that we'll use to serialize changes for a toplevel
5441 : * transaction.
5442 : *
5443 : * Open a file for streamed changes from a toplevel transaction identified
5444 : * by stream_xid (global variable). If it's the first chunk of streamed
5445 : * changes for this transaction, create the buffile, otherwise open the
5446 : * previously created file.
5447 : */
5448 : static void
5449 363 : stream_open_file(Oid subid, TransactionId xid, bool first_segment)
5450 : {
5451 : char path[MAXPGPATH];
5452 : MemoryContext oldcxt;
5453 :
5454 : Assert(OidIsValid(subid));
5455 : Assert(TransactionIdIsValid(xid));
5456 : Assert(stream_fd == NULL);
5457 :
5458 :
5459 363 : changes_filename(path, subid, xid);
5460 363 : elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
5461 :
5462 : /*
5463 : * Create/open the buffiles under the logical streaming context so that we
5464 : * have those files until stream stop.
5465 : */
5466 363 : oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
5467 :
5468 : /*
5469 : * If this is the first streamed segment, create the changes file.
5470 : * Otherwise, just open the file for writing, in append mode.
5471 : */
5472 363 : if (first_segment)
5473 32 : stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset,
5474 : path);
5475 : else
5476 : {
5477 : /*
5478 : * Open the file and seek to the end of the file because we always
5479 : * append the changes file.
5480 : */
5481 331 : stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset,
5482 : path, O_RDWR, false);
5483 331 : BufFileSeek(stream_fd, 0, 0, SEEK_END);
5484 : }
5485 :
5486 363 : MemoryContextSwitchTo(oldcxt);
5487 363 : }
5488 :
5489 : /*
5490 : * stream_close_file
5491 : * Close the currently open file with streamed changes.
5492 : */
5493 : static void
5494 393 : stream_close_file(void)
5495 : {
5496 : Assert(stream_fd != NULL);
5497 :
5498 393 : BufFileClose(stream_fd);
5499 :
5500 393 : stream_fd = NULL;
5501 393 : }
5502 :
5503 : /*
5504 : * stream_write_change
5505 : * Serialize a change to a file for the current toplevel transaction.
5506 : *
5507 : * The change is serialized in a simple format, with length (not including
5508 : * the length), action code (identifying the message type) and message
5509 : * contents (without the subxact TransactionId value).
5510 : */
5511 : static void
5512 107553 : stream_write_change(char action, StringInfo s)
5513 : {
5514 : int len;
5515 :
5516 : Assert(stream_fd != NULL);
5517 :
5518 : /* total on-disk size, including the action type character */
5519 107553 : len = (s->len - s->cursor) + sizeof(char);
5520 :
5521 : /* first write the size */
5522 107553 : BufFileWrite(stream_fd, &len, sizeof(len));
5523 :
5524 : /* then the action */
5525 107553 : BufFileWrite(stream_fd, &action, sizeof(action));
5526 :
5527 : /* and finally the remaining part of the buffer (after the XID) */
5528 107553 : len = (s->len - s->cursor);
5529 :
5530 107553 : BufFileWrite(stream_fd, &s->data[s->cursor], len);
5531 107553 : }
5532 :
5533 : /*
5534 : * stream_open_and_write_change
5535 : * Serialize a message to a file for the given transaction.
5536 : *
5537 : * This function is similar to stream_write_change except that it will open the
5538 : * target file if not already before writing the message and close the file at
5539 : * the end.
5540 : */
5541 : static void
5542 5 : stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
5543 : {
5544 : Assert(!in_streamed_transaction);
5545 :
5546 5 : if (!stream_fd)
5547 5 : stream_start_internal(xid, false);
5548 :
5549 5 : stream_write_change(action, s);
5550 5 : stream_stop_internal(xid);
5551 5 : }
5552 :
5553 : /*
5554 : * Sets streaming options including replication slot name and origin start
5555 : * position. Workers need these options for logical replication.
5556 : */
5557 : void
5558 430 : set_stream_options(WalRcvStreamOptions *options,
5559 : char *slotname,
5560 : XLogRecPtr *origin_startpos)
5561 : {
5562 : int server_version;
5563 :
5564 430 : options->logical = true;
5565 430 : options->startpoint = *origin_startpos;
5566 430 : options->slotname = slotname;
5567 :
5568 430 : server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
5569 430 : options->proto.logical.proto_version =
5570 430 : server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
5571 : server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
5572 : server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
5573 : LOGICALREP_PROTO_VERSION_NUM;
5574 :
5575 430 : options->proto.logical.publication_names = MySubscription->publications;
5576 430 : options->proto.logical.binary = MySubscription->binary;
5577 :
5578 : /*
5579 : * Assign the appropriate option value for streaming option according to
5580 : * the 'streaming' mode and the publisher's ability to support that mode.
5581 : */
5582 430 : if (server_version >= 160000 &&
5583 430 : MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
5584 : {
5585 397 : options->proto.logical.streaming_str = "parallel";
5586 397 : MyLogicalRepWorker->parallel_apply = true;
5587 : }
5588 33 : else if (server_version >= 140000 &&
5589 33 : MySubscription->stream != LOGICALREP_STREAM_OFF)
5590 : {
5591 25 : options->proto.logical.streaming_str = "on";
5592 25 : MyLogicalRepWorker->parallel_apply = false;
5593 : }
5594 : else
5595 : {
5596 8 : options->proto.logical.streaming_str = NULL;
5597 8 : MyLogicalRepWorker->parallel_apply = false;
5598 : }
5599 :
5600 430 : options->proto.logical.twophase = false;
5601 430 : options->proto.logical.origin = pstrdup(MySubscription->origin);
5602 430 : }
5603 :
5604 : /*
5605 : * Cleanup the memory for subxacts and reset the related variables.
5606 : */
5607 : static inline void
5608 376 : cleanup_subxact_info(void)
5609 : {
5610 376 : if (subxact_data.subxacts)
5611 87 : pfree(subxact_data.subxacts);
5612 :
5613 376 : subxact_data.subxacts = NULL;
5614 376 : subxact_data.subxact_last = InvalidTransactionId;
5615 376 : subxact_data.nsubxacts = 0;
5616 376 : subxact_data.nsubxacts_max = 0;
5617 376 : }
5618 :
5619 : /*
5620 : * Common function to run the apply loop with error handling. Disable the
5621 : * subscription, if necessary.
5622 : *
5623 : * Note that we don't handle FATAL errors which are probably because
5624 : * of system resource error and are not repeatable.
5625 : */
5626 : void
5627 430 : start_apply(XLogRecPtr origin_startpos)
5628 : {
5629 430 : PG_TRY();
5630 : {
5631 430 : LogicalRepApplyLoop(origin_startpos);
5632 : }
5633 84 : PG_CATCH();
5634 : {
5635 : /*
5636 : * Reset the origin state to prevent the advancement of origin
5637 : * progress if we fail to apply. Otherwise, this will result in
5638 : * transaction loss as that transaction won't be sent again by the
5639 : * server.
5640 : */
5641 84 : replorigin_xact_clear(true);
5642 :
5643 84 : if (MySubscription->disableonerr)
5644 3 : DisableSubscriptionAndExit();
5645 : else
5646 : {
5647 : /*
5648 : * Report the worker failed while applying changes. Abort the
5649 : * current transaction so that the stats message is sent in an
5650 : * idle state.
5651 : */
5652 81 : AbortOutOfAnyTransaction();
5653 81 : pgstat_report_subscription_error(MySubscription->oid);
5654 :
5655 81 : PG_RE_THROW();
5656 : }
5657 : }
5658 0 : PG_END_TRY();
5659 0 : }
5660 :
5661 : /*
5662 : * Runs the leader apply worker.
5663 : *
5664 : * It sets up replication origin, streaming options and then starts streaming.
5665 : */
5666 : static void
5667 280 : run_apply_worker(void)
5668 : {
5669 : char originname[NAMEDATALEN];
5670 280 : XLogRecPtr origin_startpos = InvalidXLogRecPtr;
5671 280 : char *slotname = NULL;
5672 : WalRcvStreamOptions options;
5673 : ReplOriginId originid;
5674 : TimeLineID startpointTLI;
5675 : char *err;
5676 : bool must_use_password;
5677 :
5678 280 : slotname = MySubscription->slotname;
5679 :
5680 : /*
5681 : * This shouldn't happen if the subscription is enabled, but guard against
5682 : * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
5683 : * slot is NULL.)
5684 : */
5685 280 : if (!slotname)
5686 0 : ereport(ERROR,
5687 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5688 : errmsg("subscription has no replication slot set")));
5689 :
5690 : /* Setup replication origin tracking. */
5691 280 : ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
5692 : originname, sizeof(originname));
5693 280 : StartTransactionCommand();
5694 280 : originid = replorigin_by_name(originname, true);
5695 280 : if (!OidIsValid(originid))
5696 0 : originid = replorigin_create(originname);
5697 280 : replorigin_session_setup(originid, 0);
5698 280 : replorigin_xact_state.origin = originid;
5699 280 : origin_startpos = replorigin_session_get_progress(false);
5700 280 : CommitTransactionCommand();
5701 :
5702 : /* Is the use of a password mandatory? */
5703 538 : must_use_password = MySubscription->passwordrequired &&
5704 258 : !MySubscription->ownersuperuser;
5705 :
5706 280 : LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
5707 : true, must_use_password,
5708 : MySubscription->name, &err);
5709 :
5710 266 : if (LogRepWorkerWalRcvConn == NULL)
5711 29 : ereport(ERROR,
5712 : (errcode(ERRCODE_CONNECTION_FAILURE),
5713 : errmsg("apply worker for subscription \"%s\" could not connect to the publisher: %s",
5714 : MySubscription->name, err)));
5715 :
5716 : /*
5717 : * We don't really use the output identify_system for anything but it does
5718 : * some initializations on the upstream so let's still call it.
5719 : */
5720 237 : (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
5721 :
5722 237 : set_apply_error_context_origin(originname);
5723 :
5724 237 : set_stream_options(&options, slotname, &origin_startpos);
5725 :
5726 : /*
5727 : * Even when the two_phase mode is requested by the user, it remains as
5728 : * the tri-state PENDING until all tablesyncs have reached READY state.
5729 : * Only then, can it become ENABLED.
5730 : *
5731 : * Note: If the subscription has no tables then leave the state as
5732 : * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
5733 : * work.
5734 : */
5735 252 : if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
5736 15 : AllTablesyncsReady())
5737 : {
5738 : /* Start streaming with two_phase enabled */
5739 9 : options.proto.logical.twophase = true;
5740 9 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
5741 :
5742 9 : StartTransactionCommand();
5743 :
5744 : /*
5745 : * Updating pg_subscription might involve TOAST table access, so
5746 : * ensure we have a valid snapshot.
5747 : */
5748 9 : PushActiveSnapshot(GetTransactionSnapshot());
5749 :
5750 9 : UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
5751 9 : MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
5752 9 : PopActiveSnapshot();
5753 9 : CommitTransactionCommand();
5754 : }
5755 : else
5756 : {
5757 228 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
5758 : }
5759 :
5760 237 : ereport(DEBUG1,
5761 : (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
5762 : MySubscription->name,
5763 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
5764 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
5765 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
5766 : "?")));
5767 :
5768 : /* Run the main loop. */
5769 237 : start_apply(origin_startpos);
5770 0 : }
5771 :
5772 : /*
5773 : * Common initialization for leader apply worker, parallel apply worker,
5774 : * tablesync worker and sequencesync worker.
5775 : *
5776 : * Initialize the database connection, in-memory subscription and necessary
5777 : * config options.
5778 : */
5779 : void
5780 573 : InitializeLogRepWorker(void)
5781 : {
5782 : MemoryContext oldctx;
5783 :
5784 : /* Run as replica session replication role. */
5785 573 : SetConfigOption("session_replication_role", "replica",
5786 : PGC_SUSET, PGC_S_OVERRIDE);
5787 :
5788 : /* Connect to our database. */
5789 573 : BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
5790 573 : MyLogicalRepWorker->userid,
5791 : 0);
5792 :
5793 : /*
5794 : * Set always-secure search path, so malicious users can't redirect user
5795 : * code (e.g. pg_index.indexprs).
5796 : */
5797 572 : SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
5798 :
5799 : /* Load the subscription into persistent memory context. */
5800 572 : ApplyContext = AllocSetContextCreate(TopMemoryContext,
5801 : "ApplyContext",
5802 : ALLOCSET_DEFAULT_SIZES);
5803 572 : StartTransactionCommand();
5804 572 : oldctx = MemoryContextSwitchTo(ApplyContext);
5805 :
5806 : /*
5807 : * Lock the subscription to prevent it from being concurrently dropped,
5808 : * then re-verify its existence. After the initialization, the worker will
5809 : * be terminated gracefully if the subscription is dropped.
5810 : */
5811 572 : LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, 0,
5812 : AccessShareLock);
5813 568 : MySubscription = GetSubscription(MyLogicalRepWorker->subid, true, true);
5814 568 : if (!MySubscription)
5815 : {
5816 61 : ereport(LOG,
5817 : (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
5818 : MyLogicalRepWorker->subid)));
5819 :
5820 : /* Ensure we remove no-longer-useful entry for worker's start time */
5821 61 : if (am_leader_apply_worker())
5822 61 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
5823 :
5824 61 : proc_exit(0);
5825 : }
5826 :
5827 507 : MySubscriptionValid = true;
5828 507 : MemoryContextSwitchTo(oldctx);
5829 :
5830 507 : if (!MySubscription->enabled)
5831 : {
5832 0 : ereport(LOG,
5833 : (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
5834 : MySubscription->name)));
5835 :
5836 0 : apply_worker_exit();
5837 : }
5838 :
5839 : /*
5840 : * Restart the worker if retain_dead_tuples was enabled during startup.
5841 : *
5842 : * At this point, the replication slot used for conflict detection might
5843 : * not exist yet, or could be dropped soon if the launcher perceives
5844 : * retain_dead_tuples as disabled. To avoid unnecessary tracking of
5845 : * oldest_nonremovable_xid when the slot is absent or at risk of being
5846 : * dropped, a restart is initiated.
5847 : *
5848 : * The oldest_nonremovable_xid should be initialized only when the
5849 : * subscription's retention is active before launching the worker. See
5850 : * logicalrep_worker_launch.
5851 : */
5852 507 : if (am_leader_apply_worker() &&
5853 280 : MySubscription->retaindeadtuples &&
5854 15 : MySubscription->retentionactive &&
5855 15 : !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid))
5856 : {
5857 0 : ereport(LOG,
5858 : errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup",
5859 : MySubscription->name, "retain_dead_tuples"));
5860 :
5861 0 : apply_worker_exit();
5862 : }
5863 :
5864 : /* Setup synchronous commit according to the user's wishes */
5865 507 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
5866 : PGC_BACKEND, PGC_S_OVERRIDE);
5867 :
5868 : /* Change wal_receiver_timeout according to the user's wishes */
5869 507 : set_wal_receiver_timeout();
5870 :
5871 : /*
5872 : * Keep us informed about subscription or role changes. Note that the
5873 : * role's superuser privilege can be revoked.
5874 : */
5875 507 : CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
5876 : subscription_change_cb,
5877 : (Datum) 0);
5878 : /* Changes to foreign servers may affect subscriptions using SERVER. */
5879 507 : CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
5880 : subscription_change_cb,
5881 : (Datum) 0);
5882 : /* Changes to user mappings may affect subscriptions using SERVER. */
5883 507 : CacheRegisterSyscacheCallback(USERMAPPINGOID,
5884 : subscription_change_cb,
5885 : (Datum) 0);
5886 :
5887 : /*
5888 : * Changes to FDW connection_function may affect subscriptions using
5889 : * SERVER.
5890 : */
5891 507 : CacheRegisterSyscacheCallback(FOREIGNDATAWRAPPEROID,
5892 : subscription_change_cb,
5893 : (Datum) 0);
5894 :
5895 507 : CacheRegisterSyscacheCallback(AUTHOID,
5896 : subscription_change_cb,
5897 : (Datum) 0);
5898 :
5899 507 : if (am_tablesync_worker())
5900 206 : ereport(LOG,
5901 : errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
5902 : MySubscription->name,
5903 : get_rel_name(MyLogicalRepWorker->relid)));
5904 301 : else if (am_sequencesync_worker())
5905 9 : ereport(LOG,
5906 : errmsg("logical replication sequence synchronization worker for subscription \"%s\" has started",
5907 : MySubscription->name));
5908 : else
5909 292 : ereport(LOG,
5910 : errmsg("logical replication apply worker for subscription \"%s\" has started",
5911 : MySubscription->name));
5912 :
5913 507 : CommitTransactionCommand();
5914 :
5915 : /*
5916 : * Register a callback to reset the origin state before aborting any
5917 : * pending transaction during shutdown (see ShutdownPostgres()). This will
5918 : * avoid origin advancement for an incomplete transaction which could
5919 : * otherwise lead to its loss as such a transaction won't be sent by the
5920 : * server again.
5921 : *
5922 : * Note that even a LOG or DEBUG statement placed after setting the origin
5923 : * state may process a shutdown signal before committing the current apply
5924 : * operation. So, it is important to register such a callback here.
5925 : *
5926 : * Register this callback here to ensure that all types of logical
5927 : * replication workers that set up origins and apply remote transactions
5928 : * are protected.
5929 : */
5930 507 : before_shmem_exit(on_exit_clear_xact_state, (Datum) 0);
5931 507 : }
5932 :
5933 : /*
5934 : * Callback on exit to clear transaction-level replication origin state.
5935 : */
5936 : static void
5937 507 : on_exit_clear_xact_state(int code, Datum arg)
5938 : {
5939 507 : replorigin_xact_clear(true);
5940 507 : }
5941 :
5942 : /*
5943 : * Common function to setup the leader apply, tablesync and sequencesync worker.
5944 : */
5945 : void
5946 561 : SetupApplyOrSyncWorker(int worker_slot)
5947 : {
5948 : /* Attach to slot */
5949 561 : logicalrep_worker_attach(worker_slot);
5950 :
5951 : Assert(am_tablesync_worker() || am_sequencesync_worker() || am_leader_apply_worker());
5952 :
5953 : /* Setup signal handling */
5954 561 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
5955 561 : BackgroundWorkerUnblockSignals();
5956 :
5957 : /*
5958 : * We don't currently need any ResourceOwner in a walreceiver process, but
5959 : * if we did, we could call CreateAuxProcessResourceOwner here.
5960 : */
5961 :
5962 : /* Initialise stats to a sanish value */
5963 561 : MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
5964 561 : MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
5965 :
5966 : /* Load the libpq-specific functions */
5967 561 : load_file("libpqwalreceiver", false);
5968 :
5969 561 : InitializeLogRepWorker();
5970 :
5971 : /* Connect to the origin and start the replication. */
5972 495 : elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
5973 : MySubscription->conninfo);
5974 :
5975 : /*
5976 : * Setup callback for syscache so that we know when something changes in
5977 : * the subscription relation state.
5978 : */
5979 495 : CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
5980 : InvalidateSyncingRelStates,
5981 : (Datum) 0);
5982 495 : }
5983 :
5984 : /* Logical Replication Apply worker entry point */
5985 : void
5986 344 : ApplyWorkerMain(Datum main_arg)
5987 : {
5988 344 : int worker_slot = DatumGetInt32(main_arg);
5989 :
5990 344 : InitializingApplyWorker = true;
5991 :
5992 344 : SetupApplyOrSyncWorker(worker_slot);
5993 :
5994 280 : InitializingApplyWorker = false;
5995 :
5996 280 : run_apply_worker();
5997 :
5998 0 : proc_exit(0);
5999 : }
6000 :
6001 : /*
6002 : * After error recovery, disable the subscription in a new transaction
6003 : * and exit cleanly.
6004 : */
6005 : void
6006 4 : DisableSubscriptionAndExit(void)
6007 : {
6008 : /*
6009 : * Emit the error message, and recover from the error state to an idle
6010 : * state
6011 : */
6012 4 : HOLD_INTERRUPTS();
6013 :
6014 4 : EmitErrorReport();
6015 4 : AbortOutOfAnyTransaction();
6016 4 : FlushErrorState();
6017 :
6018 4 : RESUME_INTERRUPTS();
6019 :
6020 : /*
6021 : * Report the worker failed during sequence synchronization, table
6022 : * synchronization, or apply.
6023 : */
6024 4 : pgstat_report_subscription_error(MyLogicalRepWorker->subid);
6025 :
6026 : /* Disable the subscription */
6027 4 : StartTransactionCommand();
6028 :
6029 : /*
6030 : * Updating pg_subscription might involve TOAST table access, so ensure we
6031 : * have a valid snapshot.
6032 : */
6033 4 : PushActiveSnapshot(GetTransactionSnapshot());
6034 :
6035 4 : DisableSubscription(MySubscription->oid);
6036 4 : PopActiveSnapshot();
6037 4 : CommitTransactionCommand();
6038 :
6039 : /* Ensure we remove no-longer-useful entry for worker's start time */
6040 4 : if (am_leader_apply_worker())
6041 3 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
6042 :
6043 : /* Notify the subscription has been disabled and exit */
6044 4 : ereport(LOG,
6045 : errmsg("subscription \"%s\" has been disabled because of an error",
6046 : MySubscription->name));
6047 :
6048 : /*
6049 : * Skip the track_commit_timestamp check when disabling the worker due to
6050 : * an error, as verifying commit timestamps is unnecessary in this
6051 : * context.
6052 : */
6053 4 : CheckSubDeadTupleRetention(false, true, WARNING,
6054 4 : MySubscription->retaindeadtuples,
6055 4 : MySubscription->retentionactive, false);
6056 :
6057 4 : proc_exit(0);
6058 : }
6059 :
6060 : /*
6061 : * Is current process a logical replication worker?
6062 : */
6063 : bool
6064 2218 : IsLogicalWorker(void)
6065 : {
6066 2218 : return MyLogicalRepWorker != NULL;
6067 : }
6068 :
6069 : /*
6070 : * Is current process a logical replication parallel apply worker?
6071 : */
6072 : bool
6073 1523 : IsLogicalParallelApplyWorker(void)
6074 : {
6075 1523 : return IsLogicalWorker() && am_parallel_apply_worker();
6076 : }
6077 :
6078 : /*
6079 : * Start skipping changes of the transaction if the given LSN matches the
6080 : * LSN specified by subscription's skiplsn.
6081 : */
6082 : static void
6083 547 : maybe_start_skipping_changes(XLogRecPtr finish_lsn)
6084 : {
6085 : Assert(!is_skipping_changes());
6086 : Assert(!in_remote_transaction);
6087 : Assert(!in_streamed_transaction);
6088 :
6089 : /*
6090 : * Quick return if it's not requested to skip this transaction. This
6091 : * function is called for every remote transaction and we assume that
6092 : * skipping the transaction is not used often.
6093 : */
6094 547 : if (likely(!XLogRecPtrIsValid(MySubscription->skiplsn) ||
6095 : MySubscription->skiplsn != finish_lsn))
6096 544 : return;
6097 :
6098 : /* Start skipping all changes of this transaction */
6099 3 : skip_xact_finish_lsn = finish_lsn;
6100 :
6101 3 : ereport(LOG,
6102 : errmsg("logical replication starts skipping transaction at LSN %X/%08X",
6103 : LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
6104 : }
6105 :
6106 : /*
6107 : * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
6108 : */
6109 : static void
6110 30 : stop_skipping_changes(void)
6111 : {
6112 30 : if (!is_skipping_changes())
6113 27 : return;
6114 :
6115 3 : ereport(LOG,
6116 : errmsg("logical replication completed skipping transaction at LSN %X/%08X",
6117 : LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
6118 :
6119 : /* Stop skipping changes */
6120 3 : skip_xact_finish_lsn = InvalidXLogRecPtr;
6121 : }
6122 :
6123 : /*
6124 : * Clear subskiplsn of pg_subscription catalog.
6125 : *
6126 : * finish_lsn is the transaction's finish LSN that is used to check if the
6127 : * subskiplsn matches it. If not matched, we raise a warning when clearing the
6128 : * subskiplsn in order to inform users for cases e.g., where the user mistakenly
6129 : * specified the wrong subskiplsn.
6130 : */
6131 : static void
6132 545 : clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
6133 : {
6134 : Relation rel;
6135 : Form_pg_subscription subform;
6136 : HeapTuple tup;
6137 545 : XLogRecPtr myskiplsn = MySubscription->skiplsn;
6138 545 : bool started_tx = false;
6139 :
6140 545 : if (likely(!XLogRecPtrIsValid(myskiplsn)) || am_parallel_apply_worker())
6141 542 : return;
6142 :
6143 3 : if (!IsTransactionState())
6144 : {
6145 1 : StartTransactionCommand();
6146 1 : started_tx = true;
6147 : }
6148 :
6149 : /*
6150 : * Updating pg_subscription might involve TOAST table access, so ensure we
6151 : * have a valid snapshot.
6152 : */
6153 3 : PushActiveSnapshot(GetTransactionSnapshot());
6154 :
6155 : /*
6156 : * Protect subskiplsn of pg_subscription from being concurrently updated
6157 : * while clearing it.
6158 : */
6159 3 : LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
6160 : AccessShareLock);
6161 :
6162 3 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
6163 :
6164 : /* Fetch the existing tuple. */
6165 3 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
6166 : ObjectIdGetDatum(MySubscription->oid));
6167 :
6168 3 : if (!HeapTupleIsValid(tup))
6169 0 : elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
6170 :
6171 3 : subform = (Form_pg_subscription) GETSTRUCT(tup);
6172 :
6173 : /*
6174 : * Clear the subskiplsn. If the user has already changed subskiplsn before
6175 : * clearing it we don't update the catalog and the replication origin
6176 : * state won't get advanced. So in the worst case, if the server crashes
6177 : * before sending an acknowledgment of the flush position the transaction
6178 : * will be sent again and the user needs to set subskiplsn again. We can
6179 : * reduce the possibility by logging a replication origin WAL record to
6180 : * advance the origin LSN instead but there is no way to advance the
6181 : * origin timestamp and it doesn't seem to be worth doing anything about
6182 : * it since it's a very rare case.
6183 : */
6184 3 : if (subform->subskiplsn == myskiplsn)
6185 : {
6186 : bool nulls[Natts_pg_subscription];
6187 : bool replaces[Natts_pg_subscription];
6188 : Datum values[Natts_pg_subscription];
6189 :
6190 3 : memset(values, 0, sizeof(values));
6191 3 : memset(nulls, false, sizeof(nulls));
6192 3 : memset(replaces, false, sizeof(replaces));
6193 :
6194 : /* reset subskiplsn */
6195 3 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
6196 3 : replaces[Anum_pg_subscription_subskiplsn - 1] = true;
6197 :
6198 3 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
6199 : replaces);
6200 3 : CatalogTupleUpdate(rel, &tup->t_self, tup);
6201 :
6202 3 : if (myskiplsn != finish_lsn)
6203 0 : ereport(WARNING,
6204 : errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
6205 : errdetail("Remote transaction's finish WAL location (LSN) %X/%08X did not match skip-LSN %X/%08X.",
6206 : LSN_FORMAT_ARGS(finish_lsn),
6207 : LSN_FORMAT_ARGS(myskiplsn)));
6208 : }
6209 :
6210 3 : heap_freetuple(tup);
6211 3 : table_close(rel, NoLock);
6212 :
6213 3 : PopActiveSnapshot();
6214 :
6215 3 : if (started_tx)
6216 1 : CommitTransactionCommand();
6217 : }
6218 :
6219 : /* Error callback to give more context info about the change being applied */
6220 : void
6221 4894 : apply_error_callback(void *arg)
6222 : {
6223 4894 : ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
6224 :
6225 4894 : if (apply_error_callback_arg.command == 0)
6226 4478 : return;
6227 :
6228 : Assert(errarg->origin_name);
6229 :
6230 416 : if (errarg->rel == NULL)
6231 : {
6232 336 : if (!TransactionIdIsValid(errarg->remote_xid))
6233 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
6234 : errarg->origin_name,
6235 : logicalrep_message_type(errarg->command));
6236 336 : else if (!XLogRecPtrIsValid(errarg->finish_lsn))
6237 262 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
6238 : errarg->origin_name,
6239 : logicalrep_message_type(errarg->command),
6240 : errarg->remote_xid);
6241 : else
6242 148 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%08X",
6243 : errarg->origin_name,
6244 : logicalrep_message_type(errarg->command),
6245 : errarg->remote_xid,
6246 74 : LSN_FORMAT_ARGS(errarg->finish_lsn));
6247 : }
6248 : else
6249 : {
6250 80 : if (errarg->remote_attnum < 0)
6251 : {
6252 80 : if (!XLogRecPtrIsValid(errarg->finish_lsn))
6253 4 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
6254 : errarg->origin_name,
6255 : logicalrep_message_type(errarg->command),
6256 2 : errarg->rel->remoterel.nspname,
6257 2 : errarg->rel->remoterel.relname,
6258 : errarg->remote_xid);
6259 : else
6260 156 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%08X",
6261 : errarg->origin_name,
6262 : logicalrep_message_type(errarg->command),
6263 78 : errarg->rel->remoterel.nspname,
6264 78 : errarg->rel->remoterel.relname,
6265 : errarg->remote_xid,
6266 78 : LSN_FORMAT_ARGS(errarg->finish_lsn));
6267 : }
6268 : else
6269 : {
6270 0 : if (!XLogRecPtrIsValid(errarg->finish_lsn))
6271 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
6272 : errarg->origin_name,
6273 : logicalrep_message_type(errarg->command),
6274 0 : errarg->rel->remoterel.nspname,
6275 0 : errarg->rel->remoterel.relname,
6276 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
6277 : errarg->remote_xid);
6278 : else
6279 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%08X",
6280 : errarg->origin_name,
6281 : logicalrep_message_type(errarg->command),
6282 0 : errarg->rel->remoterel.nspname,
6283 0 : errarg->rel->remoterel.relname,
6284 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
6285 : errarg->remote_xid,
6286 0 : LSN_FORMAT_ARGS(errarg->finish_lsn));
6287 : }
6288 : }
6289 : }
6290 :
6291 : /* Set transaction information of apply error callback */
6292 : static inline void
6293 2946 : set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
6294 : {
6295 2946 : apply_error_callback_arg.remote_xid = xid;
6296 2946 : apply_error_callback_arg.finish_lsn = lsn;
6297 2946 : }
6298 :
6299 : /* Reset all information of apply error callback */
6300 : static inline void
6301 1442 : reset_apply_error_context_info(void)
6302 : {
6303 1442 : apply_error_callback_arg.command = 0;
6304 1442 : apply_error_callback_arg.rel = NULL;
6305 1442 : apply_error_callback_arg.remote_attnum = -1;
6306 1442 : set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
6307 1442 : }
6308 :
6309 : /*
6310 : * Request wakeup of the workers for the given subscription OID
6311 : * at commit of the current transaction.
6312 : *
6313 : * This is used to ensure that the workers process assorted changes
6314 : * as soon as possible.
6315 : */
6316 : void
6317 234 : LogicalRepWorkersWakeupAtCommit(Oid subid)
6318 : {
6319 : MemoryContext oldcxt;
6320 :
6321 234 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
6322 234 : on_commit_wakeup_workers_subids =
6323 234 : list_append_unique_oid(on_commit_wakeup_workers_subids, subid);
6324 234 : MemoryContextSwitchTo(oldcxt);
6325 234 : }
6326 :
6327 : /*
6328 : * Wake up the workers of any subscriptions that were changed in this xact.
6329 : */
6330 : void
6331 599897 : AtEOXact_LogicalRepWorkers(bool isCommit)
6332 : {
6333 599897 : if (isCommit && on_commit_wakeup_workers_subids != NIL)
6334 : {
6335 : ListCell *lc;
6336 :
6337 229 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
6338 458 : foreach(lc, on_commit_wakeup_workers_subids)
6339 : {
6340 229 : Oid subid = lfirst_oid(lc);
6341 : List *workers;
6342 : ListCell *lc2;
6343 :
6344 229 : workers = logicalrep_workers_find(subid, true, false);
6345 300 : foreach(lc2, workers)
6346 : {
6347 71 : LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
6348 :
6349 71 : logicalrep_worker_wakeup_ptr(worker);
6350 : }
6351 : }
6352 229 : LWLockRelease(LogicalRepWorkerLock);
6353 : }
6354 :
6355 : /* The List storage will be reclaimed automatically in xact cleanup. */
6356 599897 : on_commit_wakeup_workers_subids = NIL;
6357 599897 : }
6358 :
6359 : /*
6360 : * Allocate the origin name in long-lived context for error context message.
6361 : */
6362 : void
6363 442 : set_apply_error_context_origin(char *originname)
6364 : {
6365 442 : apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
6366 : originname);
6367 442 : }
6368 :
6369 : /*
6370 : * Return the action to be taken for the given transaction. See
6371 : * TransApplyAction for information on each of the actions.
6372 : *
6373 : * *winfo is assigned to the destination parallel worker info when the leader
6374 : * apply worker has to pass all the transaction's changes to the parallel
6375 : * apply worker.
6376 : */
6377 : static TransApplyAction
6378 326552 : get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
6379 : {
6380 326552 : *winfo = NULL;
6381 :
6382 326552 : if (am_parallel_apply_worker())
6383 : {
6384 69108 : return TRANS_PARALLEL_APPLY;
6385 : }
6386 :
6387 : /*
6388 : * If we are processing this transaction using a parallel apply worker
6389 : * then either we send the changes to the parallel worker or if the worker
6390 : * is busy then serialize the changes to the file which will later be
6391 : * processed by the parallel worker.
6392 : */
6393 257444 : *winfo = pa_find_worker(xid);
6394 :
6395 257444 : if (*winfo && (*winfo)->serialize_changes)
6396 : {
6397 5037 : return TRANS_LEADER_PARTIAL_SERIALIZE;
6398 : }
6399 252407 : else if (*winfo)
6400 : {
6401 68906 : return TRANS_LEADER_SEND_TO_PARALLEL;
6402 : }
6403 :
6404 : /*
6405 : * If there is no parallel worker involved to process this transaction
6406 : * then we either directly apply the change or serialize it to a file
6407 : * which will later be applied when the transaction finish message is
6408 : * processed.
6409 : */
6410 183501 : else if (in_streamed_transaction)
6411 : {
6412 103198 : return TRANS_LEADER_SERIALIZE;
6413 : }
6414 : else
6415 : {
6416 80303 : return TRANS_LEADER_APPLY;
6417 : }
6418 : }
|