Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * worker.c
3 : * PostgreSQL logical replication worker (apply)
4 : *
5 : * Copyright (c) 2016-2025, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/worker.c
9 : *
10 : * NOTES
11 : * This file contains the worker which applies logical changes as they come
12 : * from remote logical replication stream.
13 : *
14 : * The main worker (apply) is started by logical replication worker
15 : * launcher for every enabled subscription in a database. It uses
16 : * walsender protocol to communicate with publisher.
17 : *
18 : * This module includes server facing code and shares libpqwalreceiver
19 : * module with walreceiver for providing the libpq specific functionality.
20 : *
21 : *
22 : * STREAMED TRANSACTIONS
23 : * ---------------------
24 : * Streamed transactions (large transactions exceeding a memory limit on the
25 : * upstream) are applied using one of two approaches:
26 : *
27 : * 1) Write to temporary files and apply when the final commit arrives
28 : *
29 : * This approach is used when the user has set the subscription's streaming
30 : * option as on.
31 : *
32 : * Unlike the regular (non-streamed) case, handling streamed transactions has
33 : * to handle aborts of both the toplevel transaction and subtransactions. This
34 : * is achieved by tracking offsets for subtransactions, which is then used
35 : * to truncate the file with serialized changes.
36 : *
37 : * The files are placed in tmp file directory by default, and the filenames
38 : * include both the XID of the toplevel transaction and OID of the
39 : * subscription. This is necessary so that different workers processing a
40 : * remote transaction with the same XID doesn't interfere.
41 : *
42 : * We use BufFiles instead of using normal temporary files because (a) the
43 : * BufFile infrastructure supports temporary files that exceed the OS file size
44 : * limit, (b) provides a way for automatic clean up on the error and (c) provides
45 : * a way to survive these files across local transactions and allow to open and
46 : * close at stream start and close. We decided to use FileSet
47 : * infrastructure as without that it deletes the files on the closure of the
48 : * file and if we decide to keep stream files open across the start/stop stream
49 : * then it will consume a lot of memory (more than 8K for each BufFile and
50 : * there could be multiple such BufFiles as the subscriber could receive
51 : * multiple start/stop streams for different transactions before getting the
52 : * commit). Moreover, if we don't use FileSet then we also need to invent
53 : * a new way to pass filenames to BufFile APIs so that we are allowed to open
54 : * the file we desired across multiple stream-open calls for the same
55 : * transaction.
56 : *
57 : * 2) Parallel apply workers.
58 : *
59 : * This approach is used when the user has set the subscription's streaming
60 : * option as parallel. See logical/applyparallelworker.c for information about
61 : * this approach.
62 : *
63 : * TWO_PHASE TRANSACTIONS
64 : * ----------------------
65 : * Two phase transactions are replayed at prepare and then committed or
66 : * rolled back at commit prepared and rollback prepared respectively. It is
67 : * possible to have a prepared transaction that arrives at the apply worker
68 : * when the tablesync is busy doing the initial copy. In this case, the apply
69 : * worker skips all the prepared operations [e.g. inserts] while the tablesync
70 : * is still busy (see the condition of should_apply_changes_for_rel). The
71 : * tablesync worker might not get such a prepared transaction because say it
72 : * was prior to the initial consistent point but might have got some later
73 : * commits. Now, the tablesync worker will exit without doing anything for the
74 : * prepared transaction skipped by the apply worker as the sync location for it
75 : * will be already ahead of the apply worker's current location. This would lead
76 : * to an "empty prepare", because later when the apply worker does the commit
77 : * prepare, there is nothing in it (the inserts were skipped earlier).
78 : *
79 : * To avoid this, and similar prepare confusions the subscription's two_phase
80 : * commit is enabled only after the initial sync is over. The two_phase option
81 : * has been implemented as a tri-state with values DISABLED, PENDING, and
82 : * ENABLED.
83 : *
84 : * Even if the user specifies they want a subscription with two_phase = on,
85 : * internally it will start with a tri-state of PENDING which only becomes
86 : * ENABLED after all tablesync initializations are completed - i.e. when all
87 : * tablesync workers have reached their READY state. In other words, the value
88 : * PENDING is only a temporary state for subscription start-up.
89 : *
90 : * Until the two_phase is properly available (ENABLED) the subscription will
91 : * behave as if two_phase = off. When the apply worker detects that all
92 : * tablesyncs have become READY (while the tri-state was PENDING) it will
93 : * restart the apply worker process. This happens in
94 : * ProcessSyncingTablesForApply.
95 : *
96 : * When the (re-started) apply worker finds that all tablesyncs are READY for a
97 : * two_phase tri-state of PENDING it start streaming messages with the
98 : * two_phase option which in turn enables the decoding of two-phase commits at
99 : * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
100 : * Now, it is possible that during the time we have not enabled two_phase, the
101 : * publisher (replication server) would have skipped some prepares but we
102 : * ensure that such prepares are sent along with commit prepare, see
103 : * ReorderBufferFinishPrepared.
104 : *
105 : * If the subscription has no tables then a two_phase tri-state PENDING is
106 : * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
107 : * PUBLICATION which might otherwise be disallowed (see below).
108 : *
109 : * If ever a user needs to be aware of the tri-state value, they can fetch it
110 : * from the pg_subscription catalog (see column subtwophasestate).
111 : *
112 : * Finally, to avoid problems mentioned in previous paragraphs from any
113 : * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
114 : * to 'off' and then again back to 'on') there is a restriction for
115 : * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
116 : * the two_phase tri-state is ENABLED, except when copy_data = false.
117 : *
118 : * We can get prepare of the same GID more than once for the genuine cases
119 : * where we have defined multiple subscriptions for publications on the same
120 : * server and prepared transaction has operations on tables subscribed to those
121 : * subscriptions. For such cases, if we use the GID sent by publisher one of
122 : * the prepares will be successful and others will fail, in which case the
123 : * server will send them again. Now, this can lead to a deadlock if user has
124 : * set synchronous_standby_names for all the subscriptions on subscriber. To
125 : * avoid such deadlocks, we generate a unique GID (consisting of the
126 : * subscription oid and the xid of the prepared transaction) for each prepare
127 : * transaction on the subscriber.
128 : *
129 : * FAILOVER
130 : * ----------------------
131 : * The logical slot on the primary can be synced to the standby by specifying
132 : * failover = true when creating the subscription. Enabling failover allows us
133 : * to smoothly transition to the promoted standby, ensuring that we can
134 : * subscribe to the new primary without losing any data.
135 : *
136 : * RETAIN DEAD TUPLES
137 : * ----------------------
138 : * Each apply worker that enabled retain_dead_tuples option maintains a
139 : * non-removable transaction ID (oldest_nonremovable_xid) in shared memory to
140 : * prevent dead rows from being removed prematurely when the apply worker still
141 : * needs them to detect update_deleted conflicts. Additionally, this helps to
142 : * retain the required commit_ts module information, which further helps to
143 : * detect update_origin_differs and delete_origin_differs conflicts reliably, as
144 : * otherwise, vacuum freeze could remove the required information.
145 : *
146 : * The logical replication launcher manages an internal replication slot named
147 : * "pg_conflict_detection". It asynchronously aggregates the non-removable
148 : * transaction ID from all apply workers to determine the appropriate xmin for
149 : * the slot, thereby retaining necessary tuples.
150 : *
151 : * The non-removable transaction ID in the apply worker is advanced to the
152 : * oldest running transaction ID once all concurrent transactions on the
153 : * publisher have been applied and flushed locally. The process involves:
154 : *
155 : * - RDT_GET_CANDIDATE_XID:
156 : * Call GetOldestActiveTransactionId() to take oldestRunningXid as the
157 : * candidate xid.
158 : *
159 : * - RDT_REQUEST_PUBLISHER_STATUS:
160 : * Send a message to the walsender requesting the publisher status, which
161 : * includes the latest WAL write position and information about transactions
162 : * that are in the commit phase.
163 : *
164 : * - RDT_WAIT_FOR_PUBLISHER_STATUS:
165 : * Wait for the status from the walsender. After receiving the first status,
166 : * do not proceed if there are concurrent remote transactions that are still
167 : * in the commit phase. These transactions might have been assigned an
168 : * earlier commit timestamp but have not yet written the commit WAL record.
169 : * Continue to request the publisher status (RDT_REQUEST_PUBLISHER_STATUS)
170 : * until all these transactions have completed.
171 : *
172 : * - RDT_WAIT_FOR_LOCAL_FLUSH:
173 : * Advance the non-removable transaction ID if the current flush location has
174 : * reached or surpassed the last received WAL position.
175 : *
176 : * - RDT_STOP_CONFLICT_INFO_RETENTION:
177 : * This phase is required only when max_retention_duration is defined. We
178 : * enter this phase if the wait time in either the
179 : * RDT_WAIT_FOR_PUBLISHER_STATUS or RDT_WAIT_FOR_LOCAL_FLUSH phase exceeds
180 : * configured max_retention_duration. In this phase,
181 : * pg_subscription.subretentionactive is updated to false within a new
182 : * transaction, and oldest_nonremovable_xid is set to InvalidTransactionId.
183 : *
184 : * - RDT_RESUME_CONFLICT_INFO_RETENTION:
185 : * This phase is required only when max_retention_duration is defined. We
186 : * enter this phase if the retention was previously stopped, and the time
187 : * required to advance the non-removable transaction ID in the
188 : * RDT_WAIT_FOR_LOCAL_FLUSH phase has decreased to within acceptable limits
189 : * (or if max_retention_duration is set to 0). During this phase,
190 : * pg_subscription.subretentionactive is updated to true within a new
191 : * transaction, and the worker will be restarted.
192 : *
193 : * The overall state progression is: GET_CANDIDATE_XID ->
194 : * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
195 : * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
196 : * WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID.
197 : *
198 : * Retaining the dead tuples for this period is sufficient for ensuring
199 : * eventual consistency using last-update-wins strategy, as dead tuples are
200 : * useful for detecting conflicts only during the application of concurrent
201 : * transactions from remote nodes. After applying and flushing all remote
202 : * transactions that occurred concurrently with the tuple DELETE, any
203 : * subsequent UPDATE from a remote node should have a later timestamp. In such
204 : * cases, it is acceptable to detect an update_missing scenario and convert the
205 : * UPDATE to an INSERT when applying it. But, for concurrent remote
206 : * transactions with earlier timestamps than the DELETE, detecting
207 : * update_deleted is necessary, as the UPDATEs in remote transactions should be
208 : * ignored if their timestamp is earlier than that of the dead tuples.
209 : *
210 : * Note that advancing the non-removable transaction ID is not supported if the
211 : * publisher is also a physical standby. This is because the logical walsender
212 : * on the standby can only get the WAL replay position but there may be more
213 : * WALs that are being replicated from the primary and those WALs could have
214 : * earlier commit timestamp.
215 : *
216 : * Similarly, when the publisher has subscribed to another publisher,
217 : * information necessary for conflict detection cannot be retained for
218 : * changes from origins other than the publisher. This is because publisher
219 : * lacks the information on concurrent transactions of other publishers to
220 : * which it subscribes. As the information on concurrent transactions is
221 : * unavailable beyond subscriber's immediate publishers, the non-removable
222 : * transaction ID might be advanced prematurely before changes from other
223 : * origins have been fully applied.
224 : *
225 : * XXX Retaining information for changes from other origins might be possible
226 : * by requesting the subscription on that origin to enable retain_dead_tuples
227 : * and fetching the conflict detection slot.xmin along with the publisher's
228 : * status. In the RDT_WAIT_FOR_PUBLISHER_STATUS phase, the apply worker could
229 : * wait for the remote slot's xmin to reach the oldest active transaction ID,
230 : * ensuring that all transactions from other origins have been applied on the
231 : * publisher, thereby getting the latest WAL position that includes all
232 : * concurrent changes. However, this approach may impact performance, so it
233 : * might not worth the effort.
234 : *
235 : * XXX It seems feasible to get the latest commit's WAL location from the
236 : * publisher and wait till that is applied. However, we can't do that
237 : * because commit timestamps can regress as a commit with a later LSN is not
238 : * guaranteed to have a later timestamp than those with earlier LSNs. Having
239 : * said that, even if that is possible, it won't improve performance much as
240 : * the apply always lag and moves slowly as compared with the transactions
241 : * on the publisher.
242 : *-------------------------------------------------------------------------
243 : */
244 :
245 : #include "postgres.h"
246 :
247 : #include <sys/stat.h>
248 : #include <unistd.h>
249 :
250 : #include "access/commit_ts.h"
251 : #include "access/table.h"
252 : #include "access/tableam.h"
253 : #include "access/twophase.h"
254 : #include "access/xact.h"
255 : #include "catalog/indexing.h"
256 : #include "catalog/pg_inherits.h"
257 : #include "catalog/pg_subscription.h"
258 : #include "catalog/pg_subscription_rel.h"
259 : #include "commands/subscriptioncmds.h"
260 : #include "commands/tablecmds.h"
261 : #include "commands/trigger.h"
262 : #include "executor/executor.h"
263 : #include "executor/execPartition.h"
264 : #include "libpq/pqformat.h"
265 : #include "miscadmin.h"
266 : #include "optimizer/optimizer.h"
267 : #include "parser/parse_relation.h"
268 : #include "pgstat.h"
269 : #include "postmaster/bgworker.h"
270 : #include "postmaster/interrupt.h"
271 : #include "postmaster/walwriter.h"
272 : #include "replication/conflict.h"
273 : #include "replication/logicallauncher.h"
274 : #include "replication/logicalproto.h"
275 : #include "replication/logicalrelation.h"
276 : #include "replication/logicalworker.h"
277 : #include "replication/origin.h"
278 : #include "replication/slot.h"
279 : #include "replication/walreceiver.h"
280 : #include "replication/worker_internal.h"
281 : #include "rewrite/rewriteHandler.h"
282 : #include "storage/buffile.h"
283 : #include "storage/ipc.h"
284 : #include "storage/lmgr.h"
285 : #include "storage/procarray.h"
286 : #include "tcop/tcopprot.h"
287 : #include "utils/acl.h"
288 : #include "utils/guc.h"
289 : #include "utils/inval.h"
290 : #include "utils/lsyscache.h"
291 : #include "utils/memutils.h"
292 : #include "utils/pg_lsn.h"
293 : #include "utils/rel.h"
294 : #include "utils/rls.h"
295 : #include "utils/snapmgr.h"
296 : #include "utils/syscache.h"
297 : #include "utils/usercontext.h"
298 :
299 : #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
300 :
301 : typedef struct FlushPosition
302 : {
303 : dlist_node node;
304 : XLogRecPtr local_end;
305 : XLogRecPtr remote_end;
306 : } FlushPosition;
307 :
308 : static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
309 :
310 : typedef struct ApplyExecutionData
311 : {
312 : EState *estate; /* executor state, used to track resources */
313 :
314 : LogicalRepRelMapEntry *targetRel; /* replication target rel */
315 : ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
316 :
317 : /* These fields are used when the target relation is partitioned: */
318 : ModifyTableState *mtstate; /* dummy ModifyTable state */
319 : PartitionTupleRouting *proute; /* partition routing info */
320 : } ApplyExecutionData;
321 :
322 : /* Struct for saving and restoring apply errcontext information */
323 : typedef struct ApplyErrorCallbackArg
324 : {
325 : LogicalRepMsgType command; /* 0 if invalid */
326 : LogicalRepRelMapEntry *rel;
327 :
328 : /* Remote node information */
329 : int remote_attnum; /* -1 if invalid */
330 : TransactionId remote_xid;
331 : XLogRecPtr finish_lsn;
332 : char *origin_name;
333 : } ApplyErrorCallbackArg;
334 :
335 : /*
336 : * The action to be taken for the changes in the transaction.
337 : *
338 : * TRANS_LEADER_APPLY:
339 : * This action means that we are in the leader apply worker or table sync
340 : * worker. The changes of the transaction are either directly applied or
341 : * are read from temporary files (for streaming transactions) and then
342 : * applied by the worker.
343 : *
344 : * TRANS_LEADER_SERIALIZE:
345 : * This action means that we are in the leader apply worker or table sync
346 : * worker. Changes are written to temporary files and then applied when the
347 : * final commit arrives.
348 : *
349 : * TRANS_LEADER_SEND_TO_PARALLEL:
350 : * This action means that we are in the leader apply worker and need to send
351 : * the changes to the parallel apply worker.
352 : *
353 : * TRANS_LEADER_PARTIAL_SERIALIZE:
354 : * This action means that we are in the leader apply worker and have sent some
355 : * changes directly to the parallel apply worker and the remaining changes are
356 : * serialized to a file, due to timeout while sending data. The parallel apply
357 : * worker will apply these serialized changes when the final commit arrives.
358 : *
359 : * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
360 : * serializing changes, the leader worker also needs to serialize the
361 : * STREAM_XXX message to a file, and wait for the parallel apply worker to
362 : * finish the transaction when processing the transaction finish command. So
363 : * this new action was introduced to keep the code and logic clear.
364 : *
365 : * TRANS_PARALLEL_APPLY:
366 : * This action means that we are in the parallel apply worker and changes of
367 : * the transaction are applied directly by the worker.
368 : */
369 : typedef enum
370 : {
371 : /* The action for non-streaming transactions. */
372 : TRANS_LEADER_APPLY,
373 :
374 : /* Actions for streaming transactions. */
375 : TRANS_LEADER_SERIALIZE,
376 : TRANS_LEADER_SEND_TO_PARALLEL,
377 : TRANS_LEADER_PARTIAL_SERIALIZE,
378 : TRANS_PARALLEL_APPLY,
379 : } TransApplyAction;
380 :
381 : /*
382 : * The phases involved in advancing the non-removable transaction ID.
383 : *
384 : * See comments atop worker.c for details of the transition between these
385 : * phases.
386 : */
387 : typedef enum
388 : {
389 : RDT_GET_CANDIDATE_XID,
390 : RDT_REQUEST_PUBLISHER_STATUS,
391 : RDT_WAIT_FOR_PUBLISHER_STATUS,
392 : RDT_WAIT_FOR_LOCAL_FLUSH,
393 : RDT_STOP_CONFLICT_INFO_RETENTION,
394 : RDT_RESUME_CONFLICT_INFO_RETENTION,
395 : } RetainDeadTuplesPhase;
396 :
397 : /*
398 : * Critical information for managing phase transitions within the
399 : * RetainDeadTuplesPhase.
400 : */
401 : typedef struct RetainDeadTuplesData
402 : {
403 : RetainDeadTuplesPhase phase; /* current phase */
404 : XLogRecPtr remote_lsn; /* WAL write position on the publisher */
405 :
406 : /*
407 : * Oldest transaction ID that was in the commit phase on the publisher.
408 : * Use FullTransactionId to prevent issues with transaction ID wraparound,
409 : * where a new remote_oldestxid could falsely appear to originate from the
410 : * past and block advancement.
411 : */
412 : FullTransactionId remote_oldestxid;
413 :
414 : /*
415 : * Next transaction ID to be assigned on the publisher. Use
416 : * FullTransactionId for consistency and to allow straightforward
417 : * comparisons with remote_oldestxid.
418 : */
419 : FullTransactionId remote_nextxid;
420 :
421 : TimestampTz reply_time; /* when the publisher responds with status */
422 :
423 : /*
424 : * Publisher transaction ID that must be awaited to complete before
425 : * entering the final phase (RDT_WAIT_FOR_LOCAL_FLUSH). Use
426 : * FullTransactionId for the same reason as remote_nextxid.
427 : */
428 : FullTransactionId remote_wait_for;
429 :
430 : TransactionId candidate_xid; /* candidate for the non-removable
431 : * transaction ID */
432 : TimestampTz flushpos_update_time; /* when the remote flush position was
433 : * updated in final phase
434 : * (RDT_WAIT_FOR_LOCAL_FLUSH) */
435 :
436 : long table_sync_wait_time; /* time spent waiting for table sync
437 : * to finish */
438 :
439 : /*
440 : * The following fields are used to determine the timing for the next
441 : * round of transaction ID advancement.
442 : */
443 : TimestampTz last_recv_time; /* when the last message was received */
444 : TimestampTz candidate_xid_time; /* when the candidate_xid is decided */
445 : int xid_advance_interval; /* how much time (ms) to wait before
446 : * attempting to advance the
447 : * non-removable transaction ID */
448 : } RetainDeadTuplesData;
449 :
450 : /*
451 : * The minimum (100ms) and maximum (3 minutes) intervals for advancing
452 : * non-removable transaction IDs. The maximum interval is a bit arbitrary but
453 : * is sufficient to not cause any undue network traffic.
454 : */
455 : #define MIN_XID_ADVANCE_INTERVAL 100
456 : #define MAX_XID_ADVANCE_INTERVAL 180000
457 :
458 : /* errcontext tracker */
459 : static ApplyErrorCallbackArg apply_error_callback_arg =
460 : {
461 : .command = 0,
462 : .rel = NULL,
463 : .remote_attnum = -1,
464 : .remote_xid = InvalidTransactionId,
465 : .finish_lsn = InvalidXLogRecPtr,
466 : .origin_name = NULL,
467 : };
468 :
469 : ErrorContextCallback *apply_error_context_stack = NULL;
470 :
471 : MemoryContext ApplyMessageContext = NULL;
472 : MemoryContext ApplyContext = NULL;
473 :
474 : /* per stream context for streaming transactions */
475 : static MemoryContext LogicalStreamingContext = NULL;
476 :
477 : WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
478 :
479 : Subscription *MySubscription = NULL;
480 : static bool MySubscriptionValid = false;
481 :
482 : static List *on_commit_wakeup_workers_subids = NIL;
483 :
484 : bool in_remote_transaction = false;
485 : static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
486 :
487 : /* fields valid only when processing streamed transaction */
488 : static bool in_streamed_transaction = false;
489 :
490 : static TransactionId stream_xid = InvalidTransactionId;
491 :
492 : /*
493 : * The number of changes applied by parallel apply worker during one streaming
494 : * block.
495 : */
496 : static uint32 parallel_stream_nchanges = 0;
497 :
498 : /* Are we initializing an apply worker? */
499 : bool InitializingApplyWorker = false;
500 :
501 : /*
502 : * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
503 : * the subscription if the remote transaction's finish LSN matches the subskiplsn.
504 : * Once we start skipping changes, we don't stop it until we skip all changes of
505 : * the transaction even if pg_subscription is updated and MySubscription->skiplsn
506 : * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
507 : * we don't skip receiving and spooling the changes since we decide whether or not
508 : * to skip applying the changes when starting to apply changes. The subskiplsn is
509 : * cleared after successfully skipping the transaction or applying non-empty
510 : * transaction. The latter prevents the mistakenly specified subskiplsn from
511 : * being left. Note that we cannot skip the streaming transactions when using
512 : * parallel apply workers because we cannot get the finish LSN before applying
513 : * the changes. So, we don't start parallel apply worker when finish LSN is set
514 : * by the user.
515 : */
516 : static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
517 : #define is_skipping_changes() (unlikely(XLogRecPtrIsValid(skip_xact_finish_lsn)))
518 :
519 : /* BufFile handle of the current streaming file */
520 : static BufFile *stream_fd = NULL;
521 :
522 : /*
523 : * The remote WAL position that has been applied and flushed locally. We record
524 : * and use this information both while sending feedback to the server and
525 : * advancing oldest_nonremovable_xid.
526 : */
527 : static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
528 :
529 : typedef struct SubXactInfo
530 : {
531 : TransactionId xid; /* XID of the subxact */
532 : int fileno; /* file number in the buffile */
533 : off_t offset; /* offset in the file */
534 : } SubXactInfo;
535 :
536 : /* Sub-transaction data for the current streaming transaction */
537 : typedef struct ApplySubXactData
538 : {
539 : uint32 nsubxacts; /* number of sub-transactions */
540 : uint32 nsubxacts_max; /* current capacity of subxacts */
541 : TransactionId subxact_last; /* xid of the last sub-transaction */
542 : SubXactInfo *subxacts; /* sub-xact offset in changes file */
543 : } ApplySubXactData;
544 :
545 : static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
546 :
547 : static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
548 : static inline void changes_filename(char *path, Oid subid, TransactionId xid);
549 :
550 : /*
551 : * Information about subtransactions of a given toplevel transaction.
552 : */
553 : static void subxact_info_write(Oid subid, TransactionId xid);
554 : static void subxact_info_read(Oid subid, TransactionId xid);
555 : static void subxact_info_add(TransactionId xid);
556 : static inline void cleanup_subxact_info(void);
557 :
558 : /*
559 : * Serialize and deserialize changes for a toplevel transaction.
560 : */
561 : static void stream_open_file(Oid subid, TransactionId xid,
562 : bool first_segment);
563 : static void stream_write_change(char action, StringInfo s);
564 : static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
565 : static void stream_close_file(void);
566 :
567 : static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
568 :
569 : static void maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
570 : bool status_received);
571 : static bool can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data);
572 : static void process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
573 : bool status_received);
574 : static void get_candidate_xid(RetainDeadTuplesData *rdt_data);
575 : static void request_publisher_status(RetainDeadTuplesData *rdt_data);
576 : static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
577 : bool status_received);
578 : static void wait_for_local_flush(RetainDeadTuplesData *rdt_data);
579 : static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
580 : static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
581 : static void resume_conflict_info_retention(RetainDeadTuplesData *rdt_data);
582 : static bool update_retention_status(bool active);
583 : static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data);
584 : static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data,
585 : bool new_xid_found);
586 :
587 : static void apply_worker_exit(void);
588 :
589 : static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
590 : static void apply_handle_insert_internal(ApplyExecutionData *edata,
591 : ResultRelInfo *relinfo,
592 : TupleTableSlot *remoteslot);
593 : static void apply_handle_update_internal(ApplyExecutionData *edata,
594 : ResultRelInfo *relinfo,
595 : TupleTableSlot *remoteslot,
596 : LogicalRepTupleData *newtup,
597 : Oid localindexoid);
598 : static void apply_handle_delete_internal(ApplyExecutionData *edata,
599 : ResultRelInfo *relinfo,
600 : TupleTableSlot *remoteslot,
601 : Oid localindexoid);
602 : static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
603 : LogicalRepRelation *remoterel,
604 : Oid localidxoid,
605 : TupleTableSlot *remoteslot,
606 : TupleTableSlot **localslot);
607 : static bool FindDeletedTupleInLocalRel(Relation localrel,
608 : Oid localidxoid,
609 : TupleTableSlot *remoteslot,
610 : TransactionId *delete_xid,
611 : RepOriginId *delete_origin,
612 : TimestampTz *delete_time);
613 : static void apply_handle_tuple_routing(ApplyExecutionData *edata,
614 : TupleTableSlot *remoteslot,
615 : LogicalRepTupleData *newtup,
616 : CmdType operation);
617 :
618 : /* Functions for skipping changes */
619 : static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
620 : static void stop_skipping_changes(void);
621 : static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
622 :
623 : /* Functions for apply error callback */
624 : static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
625 : static inline void reset_apply_error_context_info(void);
626 :
627 : static TransApplyAction get_transaction_apply_action(TransactionId xid,
628 : ParallelApplyWorkerInfo **winfo);
629 :
630 : static void replorigin_reset(int code, Datum arg);
631 :
632 : /*
633 : * Form the origin name for the subscription.
634 : *
635 : * This is a common function for tablesync and other workers. Tablesync workers
636 : * must pass a valid relid. Other callers must pass relid = InvalidOid.
637 : *
638 : * Return the name in the supplied buffer.
639 : */
640 : void
641 2670 : ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
642 : char *originname, Size szoriginname)
643 : {
644 2670 : if (OidIsValid(relid))
645 : {
646 : /* Replication origin name for tablesync workers. */
647 1502 : snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
648 : }
649 : else
650 : {
651 : /* Replication origin name for non-tablesync workers. */
652 1168 : snprintf(originname, szoriginname, "pg_%u", suboid);
653 : }
654 2670 : }
655 :
656 : /*
657 : * Should this worker apply changes for given relation.
658 : *
659 : * This is mainly needed for initial relation data sync as that runs in
660 : * separate worker process running in parallel and we need some way to skip
661 : * changes coming to the leader apply worker during the sync of a table.
662 : *
663 : * Note we need to do smaller or equals comparison for SYNCDONE state because
664 : * it might hold position of end of initial slot consistent point WAL
665 : * record + 1 (ie start of next record) and next record can be COMMIT of
666 : * transaction we are now processing (which is what we set remote_final_lsn
667 : * to in apply_handle_begin).
668 : *
669 : * Note that for streaming transactions that are being applied in the parallel
670 : * apply worker, we disallow applying changes if the target table in the
671 : * subscription is not in the READY state, because we cannot decide whether to
672 : * apply the change as we won't know remote_final_lsn by that time.
673 : *
674 : * We already checked this in pa_can_start() before assigning the
675 : * streaming transaction to the parallel worker, but it also needs to be
676 : * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
677 : * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
678 : * while applying this transaction.
679 : */
680 : static bool
681 296508 : should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
682 : {
683 296508 : switch (MyLogicalRepWorker->type)
684 : {
685 0 : case WORKERTYPE_TABLESYNC:
686 0 : return MyLogicalRepWorker->relid == rel->localreloid;
687 :
688 136958 : case WORKERTYPE_PARALLEL_APPLY:
689 : /* We don't synchronize rel's that are in unknown state. */
690 136958 : if (rel->state != SUBREL_STATE_READY &&
691 0 : rel->state != SUBREL_STATE_UNKNOWN)
692 0 : ereport(ERROR,
693 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
694 : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
695 : MySubscription->name),
696 : errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
697 :
698 136958 : return rel->state == SUBREL_STATE_READY;
699 :
700 159550 : case WORKERTYPE_APPLY:
701 159698 : return (rel->state == SUBREL_STATE_READY ||
702 148 : (rel->state == SUBREL_STATE_SYNCDONE &&
703 48 : rel->statelsn <= remote_final_lsn));
704 :
705 0 : case WORKERTYPE_SEQUENCESYNC:
706 : /* Should never happen. */
707 0 : elog(ERROR, "sequence synchronization worker is not expected to apply changes");
708 : break;
709 :
710 0 : case WORKERTYPE_UNKNOWN:
711 : /* Should never happen. */
712 0 : elog(ERROR, "Unknown worker type");
713 : }
714 :
715 0 : return false; /* dummy for compiler */
716 : }
717 :
718 : /*
719 : * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
720 : *
721 : * Start a transaction, if this is the first step (else we keep using the
722 : * existing transaction).
723 : * Also provide a global snapshot and ensure we run in ApplyMessageContext.
724 : */
725 : static void
726 297420 : begin_replication_step(void)
727 : {
728 297420 : SetCurrentStatementStartTimestamp();
729 :
730 297420 : if (!IsTransactionState())
731 : {
732 1944 : StartTransactionCommand();
733 1944 : maybe_reread_subscription();
734 : }
735 :
736 297414 : PushActiveSnapshot(GetTransactionSnapshot());
737 :
738 297414 : MemoryContextSwitchTo(ApplyMessageContext);
739 297414 : }
740 :
741 : /*
742 : * Finish up one step of a replication transaction.
743 : * Callers of begin_replication_step() must also call this.
744 : *
745 : * We don't close out the transaction here, but we should increment
746 : * the command counter to make the effects of this step visible.
747 : */
748 : static void
749 297288 : end_replication_step(void)
750 : {
751 297288 : PopActiveSnapshot();
752 :
753 297288 : CommandCounterIncrement();
754 297288 : }
755 :
756 : /*
757 : * Handle streamed transactions for both the leader apply worker and the
758 : * parallel apply workers.
759 : *
760 : * In the streaming case (receiving a block of the streamed transaction), for
761 : * serialize mode, simply redirect it to a file for the proper toplevel
762 : * transaction, and for parallel mode, the leader apply worker will send the
763 : * changes to parallel apply workers and the parallel apply worker will define
764 : * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
765 : * messages will be applied by both leader apply worker and parallel apply
766 : * workers).
767 : *
768 : * Returns true for streamed transactions (when the change is either serialized
769 : * to file or sent to parallel apply worker), false otherwise (regular mode or
770 : * needs to be processed by parallel apply worker).
771 : *
772 : * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
773 : * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
774 : * to a parallel apply worker.
775 : */
776 : static bool
777 649194 : handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
778 : {
779 : TransactionId current_xid;
780 : ParallelApplyWorkerInfo *winfo;
781 : TransApplyAction apply_action;
782 : StringInfoData original_msg;
783 :
784 649194 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
785 :
786 : /* not in streaming mode */
787 649194 : if (apply_action == TRANS_LEADER_APPLY)
788 160370 : return false;
789 :
790 : Assert(TransactionIdIsValid(stream_xid));
791 :
792 : /*
793 : * The parallel apply worker needs the xid in this message to decide
794 : * whether to define a savepoint, so save the original message that has
795 : * not moved the cursor after the xid. We will serialize this message to a
796 : * file in PARTIAL_SERIALIZE mode.
797 : */
798 488824 : original_msg = *s;
799 :
800 : /*
801 : * We should have received XID of the subxact as the first part of the
802 : * message, so extract it.
803 : */
804 488824 : current_xid = pq_getmsgint(s, 4);
805 :
806 488824 : if (!TransactionIdIsValid(current_xid))
807 0 : ereport(ERROR,
808 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
809 : errmsg_internal("invalid transaction ID in streamed replication transaction")));
810 :
811 488824 : switch (apply_action)
812 : {
813 205026 : case TRANS_LEADER_SERIALIZE:
814 : Assert(stream_fd);
815 :
816 : /* Add the new subxact to the array (unless already there). */
817 205026 : subxact_info_add(current_xid);
818 :
819 : /* Write the change to the current file */
820 205026 : stream_write_change(action, s);
821 205026 : return true;
822 :
823 136772 : case TRANS_LEADER_SEND_TO_PARALLEL:
824 : Assert(winfo);
825 :
826 : /*
827 : * XXX The publisher side doesn't always send relation/type update
828 : * messages after the streaming transaction, so also update the
829 : * relation/type in leader apply worker. See function
830 : * cleanup_rel_sync_cache.
831 : */
832 136772 : if (pa_send_data(winfo, s->len, s->data))
833 136772 : return (action != LOGICAL_REP_MSG_RELATION &&
834 : action != LOGICAL_REP_MSG_TYPE);
835 :
836 : /*
837 : * Switch to serialize mode when we are not able to send the
838 : * change to parallel apply worker.
839 : */
840 0 : pa_switch_to_partial_serialize(winfo, false);
841 :
842 : /* fall through */
843 10012 : case TRANS_LEADER_PARTIAL_SERIALIZE:
844 10012 : stream_write_change(action, &original_msg);
845 :
846 : /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
847 10012 : return (action != LOGICAL_REP_MSG_RELATION &&
848 : action != LOGICAL_REP_MSG_TYPE);
849 :
850 137014 : case TRANS_PARALLEL_APPLY:
851 137014 : parallel_stream_nchanges += 1;
852 :
853 : /* Define a savepoint for a subxact if needed. */
854 137014 : pa_start_subtrans(current_xid, stream_xid);
855 137014 : return false;
856 :
857 0 : default:
858 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
859 : return false; /* silence compiler warning */
860 : }
861 : }
862 :
863 : /*
864 : * Executor state preparation for evaluation of constraint expressions,
865 : * indexes and triggers for the specified relation.
866 : *
867 : * Note that the caller must open and close any indexes to be updated.
868 : */
869 : static ApplyExecutionData *
870 296348 : create_edata_for_relation(LogicalRepRelMapEntry *rel)
871 : {
872 : ApplyExecutionData *edata;
873 : EState *estate;
874 : RangeTblEntry *rte;
875 296348 : List *perminfos = NIL;
876 : ResultRelInfo *resultRelInfo;
877 :
878 296348 : edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
879 296348 : edata->targetRel = rel;
880 :
881 296348 : edata->estate = estate = CreateExecutorState();
882 :
883 296348 : rte = makeNode(RangeTblEntry);
884 296348 : rte->rtekind = RTE_RELATION;
885 296348 : rte->relid = RelationGetRelid(rel->localrel);
886 296348 : rte->relkind = rel->localrel->rd_rel->relkind;
887 296348 : rte->rellockmode = AccessShareLock;
888 :
889 296348 : addRTEPermissionInfo(&perminfos, rte);
890 :
891 296348 : ExecInitRangeTable(estate, list_make1(rte), perminfos,
892 : bms_make_singleton(1));
893 :
894 296348 : edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
895 :
896 : /*
897 : * Use Relation opened by logicalrep_rel_open() instead of opening it
898 : * again.
899 : */
900 296348 : InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
901 :
902 : /*
903 : * We put the ResultRelInfo in the es_opened_result_relations list, even
904 : * though we don't populate the es_result_relations array. That's a bit
905 : * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
906 : *
907 : * ExecOpenIndices() is not called here either, each execution path doing
908 : * an apply operation being responsible for that.
909 : */
910 296348 : estate->es_opened_result_relations =
911 296348 : lappend(estate->es_opened_result_relations, resultRelInfo);
912 :
913 296348 : estate->es_output_cid = GetCurrentCommandId(true);
914 :
915 : /* Prepare to catch AFTER triggers. */
916 296348 : AfterTriggerBeginQuery();
917 :
918 : /* other fields of edata remain NULL for now */
919 :
920 296348 : return edata;
921 : }
922 :
923 : /*
924 : * Finish any operations related to the executor state created by
925 : * create_edata_for_relation().
926 : */
927 : static void
928 296242 : finish_edata(ApplyExecutionData *edata)
929 : {
930 296242 : EState *estate = edata->estate;
931 :
932 : /* Handle any queued AFTER triggers. */
933 296242 : AfterTriggerEndQuery(estate);
934 :
935 : /* Shut down tuple routing, if any was done. */
936 296242 : if (edata->proute)
937 148 : ExecCleanupTupleRouting(edata->mtstate, edata->proute);
938 :
939 : /*
940 : * Cleanup. It might seem that we should call ExecCloseResultRelations()
941 : * here, but we intentionally don't. It would close the rel we added to
942 : * es_opened_result_relations above, which is wrong because we took no
943 : * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
944 : * any other relations opened during execution.
945 : */
946 296242 : ExecResetTupleTable(estate->es_tupleTable, false);
947 296242 : FreeExecutorState(estate);
948 296242 : pfree(edata);
949 296242 : }
950 :
951 : /*
952 : * Executes default values for columns for which we can't map to remote
953 : * relation columns.
954 : *
955 : * This allows us to support tables which have more columns on the downstream
956 : * than on the upstream.
957 : */
958 : static void
959 151826 : slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
960 : TupleTableSlot *slot)
961 : {
962 151826 : TupleDesc desc = RelationGetDescr(rel->localrel);
963 151826 : int num_phys_attrs = desc->natts;
964 : int i;
965 : int attnum,
966 151826 : num_defaults = 0;
967 : int *defmap;
968 : ExprState **defexprs;
969 : ExprContext *econtext;
970 :
971 151826 : econtext = GetPerTupleExprContext(estate);
972 :
973 : /* We got all the data via replication, no need to evaluate anything. */
974 151826 : if (num_phys_attrs == rel->remoterel.natts)
975 71536 : return;
976 :
977 80290 : defmap = (int *) palloc(num_phys_attrs * sizeof(int));
978 80290 : defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
979 :
980 : Assert(rel->attrmap->maplen == num_phys_attrs);
981 421326 : for (attnum = 0; attnum < num_phys_attrs; attnum++)
982 : {
983 341036 : CompactAttribute *cattr = TupleDescCompactAttr(desc, attnum);
984 : Expr *defexpr;
985 :
986 341036 : if (cattr->attisdropped || cattr->attgenerated)
987 18 : continue;
988 :
989 341018 : if (rel->attrmap->attnums[attnum] >= 0)
990 184536 : continue;
991 :
992 156482 : defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
993 :
994 156482 : if (defexpr != NULL)
995 : {
996 : /* Run the expression through planner */
997 140262 : defexpr = expression_planner(defexpr);
998 :
999 : /* Initialize executable expression in copycontext */
1000 140262 : defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
1001 140262 : defmap[num_defaults] = attnum;
1002 140262 : num_defaults++;
1003 : }
1004 : }
1005 :
1006 220552 : for (i = 0; i < num_defaults; i++)
1007 140262 : slot->tts_values[defmap[i]] =
1008 140262 : ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
1009 : }
1010 :
1011 : /*
1012 : * Store tuple data into slot.
1013 : *
1014 : * Incoming data can be either text or binary format.
1015 : */
1016 : static void
1017 296372 : slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
1018 : LogicalRepTupleData *tupleData)
1019 : {
1020 296372 : int natts = slot->tts_tupleDescriptor->natts;
1021 : int i;
1022 :
1023 296372 : ExecClearTuple(slot);
1024 :
1025 : /* Call the "in" function for each non-dropped, non-null attribute */
1026 : Assert(natts == rel->attrmap->maplen);
1027 1315820 : for (i = 0; i < natts; i++)
1028 : {
1029 1019448 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
1030 1019448 : int remoteattnum = rel->attrmap->attnums[i];
1031 :
1032 1019448 : if (!att->attisdropped && remoteattnum >= 0)
1033 605614 : {
1034 605614 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
1035 :
1036 : Assert(remoteattnum < tupleData->ncols);
1037 :
1038 : /* Set attnum for error callback */
1039 605614 : apply_error_callback_arg.remote_attnum = remoteattnum;
1040 :
1041 605614 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
1042 : {
1043 : Oid typinput;
1044 : Oid typioparam;
1045 :
1046 284766 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1047 569532 : slot->tts_values[i] =
1048 284766 : OidInputFunctionCall(typinput, colvalue->data,
1049 : typioparam, att->atttypmod);
1050 284766 : slot->tts_isnull[i] = false;
1051 : }
1052 320848 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1053 : {
1054 : Oid typreceive;
1055 : Oid typioparam;
1056 :
1057 : /*
1058 : * In some code paths we may be asked to re-parse the same
1059 : * tuple data. Reset the StringInfo's cursor so that works.
1060 : */
1061 220188 : colvalue->cursor = 0;
1062 :
1063 220188 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1064 440376 : slot->tts_values[i] =
1065 220188 : OidReceiveFunctionCall(typreceive, colvalue,
1066 : typioparam, att->atttypmod);
1067 :
1068 : /* Trouble if it didn't eat the whole buffer */
1069 220188 : if (colvalue->cursor != colvalue->len)
1070 0 : ereport(ERROR,
1071 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
1072 : errmsg("incorrect binary data format in logical replication column %d",
1073 : remoteattnum + 1)));
1074 220188 : slot->tts_isnull[i] = false;
1075 : }
1076 : else
1077 : {
1078 : /*
1079 : * NULL value from remote. (We don't expect to see
1080 : * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
1081 : * NULL.)
1082 : */
1083 100660 : slot->tts_values[i] = (Datum) 0;
1084 100660 : slot->tts_isnull[i] = true;
1085 : }
1086 :
1087 : /* Reset attnum for error callback */
1088 605614 : apply_error_callback_arg.remote_attnum = -1;
1089 : }
1090 : else
1091 : {
1092 : /*
1093 : * We assign NULL to dropped attributes and missing values
1094 : * (missing values should be later filled using
1095 : * slot_fill_defaults).
1096 : */
1097 413834 : slot->tts_values[i] = (Datum) 0;
1098 413834 : slot->tts_isnull[i] = true;
1099 : }
1100 : }
1101 :
1102 296372 : ExecStoreVirtualTuple(slot);
1103 296372 : }
1104 :
1105 : /*
1106 : * Replace updated columns with data from the LogicalRepTupleData struct.
1107 : * This is somewhat similar to heap_modify_tuple but also calls the type
1108 : * input functions on the user data.
1109 : *
1110 : * "slot" is filled with a copy of the tuple in "srcslot", replacing
1111 : * columns provided in "tupleData" and leaving others as-is.
1112 : *
1113 : * Caution: unreplaced pass-by-ref columns in "slot" will point into the
1114 : * storage for "srcslot". This is OK for current usage, but someday we may
1115 : * need to materialize "slot" at the end to make it independent of "srcslot".
1116 : */
1117 : static void
1118 63850 : slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
1119 : LogicalRepRelMapEntry *rel,
1120 : LogicalRepTupleData *tupleData)
1121 : {
1122 63850 : int natts = slot->tts_tupleDescriptor->natts;
1123 : int i;
1124 :
1125 : /* We'll fill "slot" with a virtual tuple, so we must start with ... */
1126 63850 : ExecClearTuple(slot);
1127 :
1128 : /*
1129 : * Copy all the column data from srcslot, so that we'll have valid values
1130 : * for unreplaced columns.
1131 : */
1132 : Assert(natts == srcslot->tts_tupleDescriptor->natts);
1133 63850 : slot_getallattrs(srcslot);
1134 63850 : memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
1135 63850 : memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
1136 :
1137 : /* Call the "in" function for each replaced attribute */
1138 : Assert(natts == rel->attrmap->maplen);
1139 318568 : for (i = 0; i < natts; i++)
1140 : {
1141 254718 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
1142 254718 : int remoteattnum = rel->attrmap->attnums[i];
1143 :
1144 254718 : if (remoteattnum < 0)
1145 117038 : continue;
1146 :
1147 : Assert(remoteattnum < tupleData->ncols);
1148 :
1149 137680 : if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
1150 : {
1151 137674 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
1152 :
1153 : /* Set attnum for error callback */
1154 137674 : apply_error_callback_arg.remote_attnum = remoteattnum;
1155 :
1156 137674 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
1157 : {
1158 : Oid typinput;
1159 : Oid typioparam;
1160 :
1161 50866 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1162 101732 : slot->tts_values[i] =
1163 50866 : OidInputFunctionCall(typinput, colvalue->data,
1164 : typioparam, att->atttypmod);
1165 50866 : slot->tts_isnull[i] = false;
1166 : }
1167 86808 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1168 : {
1169 : Oid typreceive;
1170 : Oid typioparam;
1171 :
1172 : /*
1173 : * In some code paths we may be asked to re-parse the same
1174 : * tuple data. Reset the StringInfo's cursor so that works.
1175 : */
1176 86712 : colvalue->cursor = 0;
1177 :
1178 86712 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1179 173424 : slot->tts_values[i] =
1180 86712 : OidReceiveFunctionCall(typreceive, colvalue,
1181 : typioparam, att->atttypmod);
1182 :
1183 : /* Trouble if it didn't eat the whole buffer */
1184 86712 : if (colvalue->cursor != colvalue->len)
1185 0 : ereport(ERROR,
1186 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
1187 : errmsg("incorrect binary data format in logical replication column %d",
1188 : remoteattnum + 1)));
1189 86712 : slot->tts_isnull[i] = false;
1190 : }
1191 : else
1192 : {
1193 : /* must be LOGICALREP_COLUMN_NULL */
1194 96 : slot->tts_values[i] = (Datum) 0;
1195 96 : slot->tts_isnull[i] = true;
1196 : }
1197 :
1198 : /* Reset attnum for error callback */
1199 137674 : apply_error_callback_arg.remote_attnum = -1;
1200 : }
1201 : }
1202 :
1203 : /* And finally, declare that "slot" contains a valid virtual tuple */
1204 63850 : ExecStoreVirtualTuple(slot);
1205 63850 : }
1206 :
1207 : /*
1208 : * Handle BEGIN message.
1209 : */
1210 : static void
1211 998 : apply_handle_begin(StringInfo s)
1212 : {
1213 : LogicalRepBeginData begin_data;
1214 :
1215 : /* There must not be an active streaming transaction. */
1216 : Assert(!TransactionIdIsValid(stream_xid));
1217 :
1218 998 : logicalrep_read_begin(s, &begin_data);
1219 998 : set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
1220 :
1221 998 : remote_final_lsn = begin_data.final_lsn;
1222 :
1223 998 : maybe_start_skipping_changes(begin_data.final_lsn);
1224 :
1225 998 : in_remote_transaction = true;
1226 :
1227 998 : pgstat_report_activity(STATE_RUNNING, NULL);
1228 998 : }
1229 :
1230 : /*
1231 : * Handle COMMIT message.
1232 : *
1233 : * TODO, support tracking of multiple origins
1234 : */
1235 : static void
1236 870 : apply_handle_commit(StringInfo s)
1237 : {
1238 : LogicalRepCommitData commit_data;
1239 :
1240 870 : logicalrep_read_commit(s, &commit_data);
1241 :
1242 870 : if (commit_data.commit_lsn != remote_final_lsn)
1243 0 : ereport(ERROR,
1244 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1245 : errmsg_internal("incorrect commit LSN %X/%08X in commit message (expected %X/%08X)",
1246 : LSN_FORMAT_ARGS(commit_data.commit_lsn),
1247 : LSN_FORMAT_ARGS(remote_final_lsn))));
1248 :
1249 870 : apply_handle_commit_internal(&commit_data);
1250 :
1251 : /*
1252 : * Process any tables that are being synchronized in parallel, as well as
1253 : * any newly added tables or sequences.
1254 : */
1255 870 : ProcessSyncingRelations(commit_data.end_lsn);
1256 :
1257 870 : pgstat_report_activity(STATE_IDLE, NULL);
1258 870 : reset_apply_error_context_info();
1259 870 : }
1260 :
1261 : /*
1262 : * Handle BEGIN PREPARE message.
1263 : */
1264 : static void
1265 32 : apply_handle_begin_prepare(StringInfo s)
1266 : {
1267 : LogicalRepPreparedTxnData begin_data;
1268 :
1269 : /* Tablesync should never receive prepare. */
1270 32 : if (am_tablesync_worker())
1271 0 : ereport(ERROR,
1272 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1273 : errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1274 :
1275 : /* There must not be an active streaming transaction. */
1276 : Assert(!TransactionIdIsValid(stream_xid));
1277 :
1278 32 : logicalrep_read_begin_prepare(s, &begin_data);
1279 32 : set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
1280 :
1281 32 : remote_final_lsn = begin_data.prepare_lsn;
1282 :
1283 32 : maybe_start_skipping_changes(begin_data.prepare_lsn);
1284 :
1285 32 : in_remote_transaction = true;
1286 :
1287 32 : pgstat_report_activity(STATE_RUNNING, NULL);
1288 32 : }
1289 :
1290 : /*
1291 : * Common function to prepare the GID.
1292 : */
1293 : static void
1294 46 : apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
1295 : {
1296 : char gid[GIDSIZE];
1297 :
1298 : /*
1299 : * Compute unique GID for two_phase transactions. We don't use GID of
1300 : * prepared transaction sent by server as that can lead to deadlock when
1301 : * we have multiple subscriptions from same node point to publications on
1302 : * the same node. See comments atop worker.c
1303 : */
1304 46 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
1305 : gid, sizeof(gid));
1306 :
1307 : /*
1308 : * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1309 : * called within the PrepareTransactionBlock below.
1310 : */
1311 46 : if (!IsTransactionBlock())
1312 : {
1313 46 : BeginTransactionBlock();
1314 46 : CommitTransactionCommand(); /* Completes the preceding Begin command. */
1315 : }
1316 :
1317 : /*
1318 : * Update origin state so we can restart streaming from correct position
1319 : * in case of crash.
1320 : */
1321 46 : replorigin_session_origin_lsn = prepare_data->end_lsn;
1322 46 : replorigin_session_origin_timestamp = prepare_data->prepare_time;
1323 :
1324 46 : PrepareTransactionBlock(gid);
1325 46 : }
1326 :
1327 : /*
1328 : * Handle PREPARE message.
1329 : */
1330 : static void
1331 30 : apply_handle_prepare(StringInfo s)
1332 : {
1333 : LogicalRepPreparedTxnData prepare_data;
1334 :
1335 30 : logicalrep_read_prepare(s, &prepare_data);
1336 :
1337 30 : if (prepare_data.prepare_lsn != remote_final_lsn)
1338 0 : ereport(ERROR,
1339 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1340 : errmsg_internal("incorrect prepare LSN %X/%08X in prepare message (expected %X/%08X)",
1341 : LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1342 : LSN_FORMAT_ARGS(remote_final_lsn))));
1343 :
1344 : /*
1345 : * Unlike commit, here, we always prepare the transaction even though no
1346 : * change has happened in this transaction or all changes are skipped. It
1347 : * is done this way because at commit prepared time, we won't know whether
1348 : * we have skipped preparing a transaction because of those reasons.
1349 : *
1350 : * XXX, We can optimize such that at commit prepared time, we first check
1351 : * whether we have prepared the transaction or not but that doesn't seem
1352 : * worthwhile because such cases shouldn't be common.
1353 : */
1354 30 : begin_replication_step();
1355 :
1356 30 : apply_handle_prepare_internal(&prepare_data);
1357 :
1358 30 : end_replication_step();
1359 30 : CommitTransactionCommand();
1360 28 : pgstat_report_stat(false);
1361 :
1362 : /*
1363 : * It is okay not to set the local_end LSN for the prepare because we
1364 : * always flush the prepare record. So, we can send the acknowledgment of
1365 : * the remote_end LSN as soon as prepare is finished.
1366 : *
1367 : * XXX For the sake of consistency with commit, we could have set it with
1368 : * the LSN of prepare but as of now we don't track that value similar to
1369 : * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
1370 : * it.
1371 : */
1372 28 : store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
1373 :
1374 28 : in_remote_transaction = false;
1375 :
1376 : /*
1377 : * Process any tables that are being synchronized in parallel, as well as
1378 : * any newly added tables or sequences.
1379 : */
1380 28 : ProcessSyncingRelations(prepare_data.end_lsn);
1381 :
1382 : /*
1383 : * Since we have already prepared the transaction, in a case where the
1384 : * server crashes before clearing the subskiplsn, it will be left but the
1385 : * transaction won't be resent. But that's okay because it's a rare case
1386 : * and the subskiplsn will be cleared when finishing the next transaction.
1387 : */
1388 28 : stop_skipping_changes();
1389 28 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
1390 :
1391 28 : pgstat_report_activity(STATE_IDLE, NULL);
1392 28 : reset_apply_error_context_info();
1393 28 : }
1394 :
1395 : /*
1396 : * Handle a COMMIT PREPARED of a previously PREPARED transaction.
1397 : *
1398 : * Note that we don't need to wait here if the transaction was prepared in a
1399 : * parallel apply worker. In that case, we have already waited for the prepare
1400 : * to finish in apply_handle_stream_prepare() which will ensure all the
1401 : * operations in that transaction have happened in the subscriber, so no
1402 : * concurrent transaction can cause deadlock or transaction dependency issues.
1403 : */
1404 : static void
1405 40 : apply_handle_commit_prepared(StringInfo s)
1406 : {
1407 : LogicalRepCommitPreparedTxnData prepare_data;
1408 : char gid[GIDSIZE];
1409 :
1410 40 : logicalrep_read_commit_prepared(s, &prepare_data);
1411 40 : set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
1412 :
1413 : /* Compute GID for two_phase transactions. */
1414 40 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
1415 : gid, sizeof(gid));
1416 :
1417 : /* There is no transaction when COMMIT PREPARED is called */
1418 40 : begin_replication_step();
1419 :
1420 : /*
1421 : * Update origin state so we can restart streaming from correct position
1422 : * in case of crash.
1423 : */
1424 40 : replorigin_session_origin_lsn = prepare_data.end_lsn;
1425 40 : replorigin_session_origin_timestamp = prepare_data.commit_time;
1426 :
1427 40 : FinishPreparedTransaction(gid, true);
1428 40 : end_replication_step();
1429 40 : CommitTransactionCommand();
1430 40 : pgstat_report_stat(false);
1431 :
1432 40 : store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
1433 40 : in_remote_transaction = false;
1434 :
1435 : /*
1436 : * Process any tables that are being synchronized in parallel, as well as
1437 : * any newly added tables or sequences.
1438 : */
1439 40 : ProcessSyncingRelations(prepare_data.end_lsn);
1440 :
1441 40 : clear_subscription_skip_lsn(prepare_data.end_lsn);
1442 :
1443 40 : pgstat_report_activity(STATE_IDLE, NULL);
1444 40 : reset_apply_error_context_info();
1445 40 : }
1446 :
1447 : /*
1448 : * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
1449 : *
1450 : * Note that we don't need to wait here if the transaction was prepared in a
1451 : * parallel apply worker. In that case, we have already waited for the prepare
1452 : * to finish in apply_handle_stream_prepare() which will ensure all the
1453 : * operations in that transaction have happened in the subscriber, so no
1454 : * concurrent transaction can cause deadlock or transaction dependency issues.
1455 : */
1456 : static void
1457 10 : apply_handle_rollback_prepared(StringInfo s)
1458 : {
1459 : LogicalRepRollbackPreparedTxnData rollback_data;
1460 : char gid[GIDSIZE];
1461 :
1462 10 : logicalrep_read_rollback_prepared(s, &rollback_data);
1463 10 : set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1464 :
1465 : /* Compute GID for two_phase transactions. */
1466 10 : TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
1467 : gid, sizeof(gid));
1468 :
1469 : /*
1470 : * It is possible that we haven't received prepare because it occurred
1471 : * before walsender reached a consistent point or the two_phase was still
1472 : * not enabled by that time, so in such cases, we need to skip rollback
1473 : * prepared.
1474 : */
1475 10 : if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1476 : rollback_data.prepare_time))
1477 : {
1478 : /*
1479 : * Update origin state so we can restart streaming from correct
1480 : * position in case of crash.
1481 : */
1482 10 : replorigin_session_origin_lsn = rollback_data.rollback_end_lsn;
1483 10 : replorigin_session_origin_timestamp = rollback_data.rollback_time;
1484 :
1485 : /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1486 10 : begin_replication_step();
1487 10 : FinishPreparedTransaction(gid, false);
1488 10 : end_replication_step();
1489 10 : CommitTransactionCommand();
1490 :
1491 10 : clear_subscription_skip_lsn(rollback_data.rollback_end_lsn);
1492 : }
1493 :
1494 10 : pgstat_report_stat(false);
1495 :
1496 : /*
1497 : * It is okay not to set the local_end LSN for the rollback of prepared
1498 : * transaction because we always flush the WAL record for it. See
1499 : * apply_handle_prepare.
1500 : */
1501 10 : store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr);
1502 10 : in_remote_transaction = false;
1503 :
1504 : /*
1505 : * Process any tables that are being synchronized in parallel, as well as
1506 : * any newly added tables or sequences.
1507 : */
1508 10 : ProcessSyncingRelations(rollback_data.rollback_end_lsn);
1509 :
1510 10 : pgstat_report_activity(STATE_IDLE, NULL);
1511 10 : reset_apply_error_context_info();
1512 10 : }
1513 :
1514 : /*
1515 : * Handle STREAM PREPARE.
1516 : */
1517 : static void
1518 22 : apply_handle_stream_prepare(StringInfo s)
1519 : {
1520 : LogicalRepPreparedTxnData prepare_data;
1521 : ParallelApplyWorkerInfo *winfo;
1522 : TransApplyAction apply_action;
1523 :
1524 : /* Save the message before it is consumed. */
1525 22 : StringInfoData original_msg = *s;
1526 :
1527 22 : if (in_streamed_transaction)
1528 0 : ereport(ERROR,
1529 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1530 : errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1531 :
1532 : /* Tablesync should never receive prepare. */
1533 22 : if (am_tablesync_worker())
1534 0 : ereport(ERROR,
1535 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1536 : errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1537 :
1538 22 : logicalrep_read_stream_prepare(s, &prepare_data);
1539 22 : set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1540 :
1541 22 : apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
1542 :
1543 22 : switch (apply_action)
1544 : {
1545 10 : case TRANS_LEADER_APPLY:
1546 :
1547 : /*
1548 : * The transaction has been serialized to file, so replay all the
1549 : * spooled operations.
1550 : */
1551 10 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset,
1552 : prepare_data.xid, prepare_data.prepare_lsn);
1553 :
1554 : /* Mark the transaction as prepared. */
1555 10 : apply_handle_prepare_internal(&prepare_data);
1556 :
1557 10 : CommitTransactionCommand();
1558 :
1559 : /*
1560 : * It is okay not to set the local_end LSN for the prepare because
1561 : * we always flush the prepare record. See apply_handle_prepare.
1562 : */
1563 10 : store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
1564 :
1565 10 : in_remote_transaction = false;
1566 :
1567 : /* Unlink the files with serialized changes and subxact info. */
1568 10 : stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
1569 :
1570 10 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1571 10 : break;
1572 :
1573 4 : case TRANS_LEADER_SEND_TO_PARALLEL:
1574 : Assert(winfo);
1575 :
1576 4 : if (pa_send_data(winfo, s->len, s->data))
1577 : {
1578 : /* Finish processing the streaming transaction. */
1579 4 : pa_xact_finish(winfo, prepare_data.end_lsn);
1580 4 : break;
1581 : }
1582 :
1583 : /*
1584 : * Switch to serialize mode when we are not able to send the
1585 : * change to parallel apply worker.
1586 : */
1587 0 : pa_switch_to_partial_serialize(winfo, true);
1588 :
1589 : /* fall through */
1590 2 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1591 : Assert(winfo);
1592 :
1593 2 : stream_open_and_write_change(prepare_data.xid,
1594 : LOGICAL_REP_MSG_STREAM_PREPARE,
1595 : &original_msg);
1596 :
1597 2 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
1598 :
1599 : /* Finish processing the streaming transaction. */
1600 2 : pa_xact_finish(winfo, prepare_data.end_lsn);
1601 2 : break;
1602 :
1603 6 : case TRANS_PARALLEL_APPLY:
1604 :
1605 : /*
1606 : * If the parallel apply worker is applying spooled messages then
1607 : * close the file before preparing.
1608 : */
1609 6 : if (stream_fd)
1610 2 : stream_close_file();
1611 :
1612 6 : begin_replication_step();
1613 :
1614 : /* Mark the transaction as prepared. */
1615 6 : apply_handle_prepare_internal(&prepare_data);
1616 :
1617 6 : end_replication_step();
1618 :
1619 6 : CommitTransactionCommand();
1620 :
1621 : /*
1622 : * It is okay not to set the local_end LSN for the prepare because
1623 : * we always flush the prepare record. See apply_handle_prepare.
1624 : */
1625 6 : MyParallelShared->last_commit_end = InvalidXLogRecPtr;
1626 :
1627 6 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
1628 6 : pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1629 :
1630 6 : pa_reset_subtrans();
1631 :
1632 6 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1633 6 : break;
1634 :
1635 0 : default:
1636 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1637 : break;
1638 : }
1639 :
1640 22 : pgstat_report_stat(false);
1641 :
1642 : /*
1643 : * Process any tables that are being synchronized in parallel, as well as
1644 : * any newly added tables or sequences.
1645 : */
1646 22 : ProcessSyncingRelations(prepare_data.end_lsn);
1647 :
1648 : /*
1649 : * Similar to prepare case, the subskiplsn could be left in a case of
1650 : * server crash but it's okay. See the comments in apply_handle_prepare().
1651 : */
1652 22 : stop_skipping_changes();
1653 22 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
1654 :
1655 22 : pgstat_report_activity(STATE_IDLE, NULL);
1656 :
1657 22 : reset_apply_error_context_info();
1658 22 : }
1659 :
1660 : /*
1661 : * Handle ORIGIN message.
1662 : *
1663 : * TODO, support tracking of multiple origins
1664 : */
1665 : static void
1666 14 : apply_handle_origin(StringInfo s)
1667 : {
1668 : /*
1669 : * ORIGIN message can only come inside streaming transaction or inside
1670 : * remote transaction and before any actual writes.
1671 : */
1672 14 : if (!in_streamed_transaction &&
1673 20 : (!in_remote_transaction ||
1674 10 : (IsTransactionState() && !am_tablesync_worker())))
1675 0 : ereport(ERROR,
1676 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1677 : errmsg_internal("ORIGIN message sent out of order")));
1678 14 : }
1679 :
1680 : /*
1681 : * Initialize fileset (if not already done).
1682 : *
1683 : * Create a new file when first_segment is true, otherwise open the existing
1684 : * file.
1685 : */
1686 : void
1687 724 : stream_start_internal(TransactionId xid, bool first_segment)
1688 : {
1689 724 : begin_replication_step();
1690 :
1691 : /*
1692 : * Initialize the worker's stream_fileset if we haven't yet. This will be
1693 : * used for the entire duration of the worker so create it in a permanent
1694 : * context. We create this on the very first streaming message from any
1695 : * transaction and then use it for this and other streaming transactions.
1696 : * Now, we could create a fileset at the start of the worker as well but
1697 : * then we won't be sure that it will ever be used.
1698 : */
1699 724 : if (!MyLogicalRepWorker->stream_fileset)
1700 : {
1701 : MemoryContext oldctx;
1702 :
1703 28 : oldctx = MemoryContextSwitchTo(ApplyContext);
1704 :
1705 28 : MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
1706 28 : FileSetInit(MyLogicalRepWorker->stream_fileset);
1707 :
1708 28 : MemoryContextSwitchTo(oldctx);
1709 : }
1710 :
1711 : /* Open the spool file for this transaction. */
1712 724 : stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
1713 :
1714 : /* If this is not the first segment, open existing subxact file. */
1715 724 : if (!first_segment)
1716 660 : subxact_info_read(MyLogicalRepWorker->subid, xid);
1717 :
1718 724 : end_replication_step();
1719 724 : }
1720 :
1721 : /*
1722 : * Handle STREAM START message.
1723 : */
1724 : static void
1725 1676 : apply_handle_stream_start(StringInfo s)
1726 : {
1727 : bool first_segment;
1728 : ParallelApplyWorkerInfo *winfo;
1729 : TransApplyAction apply_action;
1730 :
1731 : /* Save the message before it is consumed. */
1732 1676 : StringInfoData original_msg = *s;
1733 :
1734 1676 : if (in_streamed_transaction)
1735 0 : ereport(ERROR,
1736 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1737 : errmsg_internal("duplicate STREAM START message")));
1738 :
1739 : /* There must not be an active streaming transaction. */
1740 : Assert(!TransactionIdIsValid(stream_xid));
1741 :
1742 : /* notify handle methods we're processing a remote transaction */
1743 1676 : in_streamed_transaction = true;
1744 :
1745 : /* extract XID of the top-level transaction */
1746 1676 : stream_xid = logicalrep_read_stream_start(s, &first_segment);
1747 :
1748 1676 : if (!TransactionIdIsValid(stream_xid))
1749 0 : ereport(ERROR,
1750 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1751 : errmsg_internal("invalid transaction ID in streamed replication transaction")));
1752 :
1753 1676 : set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
1754 :
1755 : /* Try to allocate a worker for the streaming transaction. */
1756 1676 : if (first_segment)
1757 164 : pa_allocate_worker(stream_xid);
1758 :
1759 1676 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1760 :
1761 1676 : switch (apply_action)
1762 : {
1763 684 : case TRANS_LEADER_SERIALIZE:
1764 :
1765 : /*
1766 : * Function stream_start_internal starts a transaction. This
1767 : * transaction will be committed on the stream stop unless it is a
1768 : * tablesync worker in which case it will be committed after
1769 : * processing all the messages. We need this transaction for
1770 : * handling the BufFile, used for serializing the streaming data
1771 : * and subxact info.
1772 : */
1773 684 : stream_start_internal(stream_xid, first_segment);
1774 684 : break;
1775 :
1776 484 : case TRANS_LEADER_SEND_TO_PARALLEL:
1777 : Assert(winfo);
1778 :
1779 : /*
1780 : * Once we start serializing the changes, the parallel apply
1781 : * worker will wait for the leader to release the stream lock
1782 : * until the end of the transaction. So, we don't need to release
1783 : * the lock or increment the stream count in that case.
1784 : */
1785 484 : if (pa_send_data(winfo, s->len, s->data))
1786 : {
1787 : /*
1788 : * Unlock the shared object lock so that the parallel apply
1789 : * worker can continue to receive changes.
1790 : */
1791 476 : if (!first_segment)
1792 430 : pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
1793 :
1794 : /*
1795 : * Increment the number of streaming blocks waiting to be
1796 : * processed by parallel apply worker.
1797 : */
1798 476 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
1799 :
1800 : /* Cache the parallel apply worker for this transaction. */
1801 476 : pa_set_stream_apply_worker(winfo);
1802 476 : break;
1803 : }
1804 :
1805 : /*
1806 : * Switch to serialize mode when we are not able to send the
1807 : * change to parallel apply worker.
1808 : */
1809 8 : pa_switch_to_partial_serialize(winfo, !first_segment);
1810 :
1811 : /* fall through */
1812 30 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1813 : Assert(winfo);
1814 :
1815 : /*
1816 : * Open the spool file unless it was already opened when switching
1817 : * to serialize mode. The transaction started in
1818 : * stream_start_internal will be committed on the stream stop.
1819 : */
1820 30 : if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
1821 22 : stream_start_internal(stream_xid, first_segment);
1822 :
1823 30 : stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg);
1824 :
1825 : /* Cache the parallel apply worker for this transaction. */
1826 30 : pa_set_stream_apply_worker(winfo);
1827 30 : break;
1828 :
1829 486 : case TRANS_PARALLEL_APPLY:
1830 486 : if (first_segment)
1831 : {
1832 : /* Hold the lock until the end of the transaction. */
1833 54 : pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1834 54 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED);
1835 :
1836 : /*
1837 : * Signal the leader apply worker, as it may be waiting for
1838 : * us.
1839 : */
1840 54 : logicalrep_worker_wakeup(WORKERTYPE_APPLY,
1841 54 : MyLogicalRepWorker->subid, InvalidOid);
1842 : }
1843 :
1844 486 : parallel_stream_nchanges = 0;
1845 486 : break;
1846 :
1847 0 : default:
1848 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1849 : break;
1850 : }
1851 :
1852 1676 : pgstat_report_activity(STATE_RUNNING, NULL);
1853 1676 : }
1854 :
1855 : /*
1856 : * Update the information about subxacts and close the file.
1857 : *
1858 : * This function should be called when the stream_start_internal function has
1859 : * been called.
1860 : */
1861 : void
1862 724 : stream_stop_internal(TransactionId xid)
1863 : {
1864 : /*
1865 : * Serialize information about subxacts for the toplevel transaction, then
1866 : * close the stream messages spool file.
1867 : */
1868 724 : subxact_info_write(MyLogicalRepWorker->subid, xid);
1869 724 : stream_close_file();
1870 :
1871 : /* We must be in a valid transaction state */
1872 : Assert(IsTransactionState());
1873 :
1874 : /* Commit the per-stream transaction */
1875 724 : CommitTransactionCommand();
1876 :
1877 : /* Reset per-stream context */
1878 724 : MemoryContextReset(LogicalStreamingContext);
1879 724 : }
1880 :
1881 : /*
1882 : * Handle STREAM STOP message.
1883 : */
1884 : static void
1885 1674 : apply_handle_stream_stop(StringInfo s)
1886 : {
1887 : ParallelApplyWorkerInfo *winfo;
1888 : TransApplyAction apply_action;
1889 :
1890 1674 : if (!in_streamed_transaction)
1891 0 : ereport(ERROR,
1892 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1893 : errmsg_internal("STREAM STOP message without STREAM START")));
1894 :
1895 1674 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1896 :
1897 1674 : switch (apply_action)
1898 : {
1899 684 : case TRANS_LEADER_SERIALIZE:
1900 684 : stream_stop_internal(stream_xid);
1901 684 : break;
1902 :
1903 476 : case TRANS_LEADER_SEND_TO_PARALLEL:
1904 : Assert(winfo);
1905 :
1906 : /*
1907 : * Lock before sending the STREAM_STOP message so that the leader
1908 : * can hold the lock first and the parallel apply worker will wait
1909 : * for leader to release the lock. See Locking Considerations atop
1910 : * applyparallelworker.c.
1911 : */
1912 476 : pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
1913 :
1914 476 : if (pa_send_data(winfo, s->len, s->data))
1915 : {
1916 476 : pa_set_stream_apply_worker(NULL);
1917 476 : break;
1918 : }
1919 :
1920 : /*
1921 : * Switch to serialize mode when we are not able to send the
1922 : * change to parallel apply worker.
1923 : */
1924 0 : pa_switch_to_partial_serialize(winfo, true);
1925 :
1926 : /* fall through */
1927 30 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1928 30 : stream_write_change(LOGICAL_REP_MSG_STREAM_STOP, s);
1929 30 : stream_stop_internal(stream_xid);
1930 30 : pa_set_stream_apply_worker(NULL);
1931 30 : break;
1932 :
1933 484 : case TRANS_PARALLEL_APPLY:
1934 484 : elog(DEBUG1, "applied %u changes in the streaming chunk",
1935 : parallel_stream_nchanges);
1936 :
1937 : /*
1938 : * By the time parallel apply worker is processing the changes in
1939 : * the current streaming block, the leader apply worker may have
1940 : * sent multiple streaming blocks. This can lead to parallel apply
1941 : * worker start waiting even when there are more chunk of streams
1942 : * in the queue. So, try to lock only if there is no message left
1943 : * in the queue. See Locking Considerations atop
1944 : * applyparallelworker.c.
1945 : *
1946 : * Note that here we have a race condition where we can start
1947 : * waiting even when there are pending streaming chunks. This can
1948 : * happen if the leader sends another streaming block and acquires
1949 : * the stream lock again after the parallel apply worker checks
1950 : * that there is no pending streaming block and before it actually
1951 : * starts waiting on a lock. We can handle this case by not
1952 : * allowing the leader to increment the stream block count during
1953 : * the time parallel apply worker acquires the lock but it is not
1954 : * clear whether that is worth the complexity.
1955 : *
1956 : * Now, if this missed chunk contains rollback to savepoint, then
1957 : * there is a risk of deadlock which probably shouldn't happen
1958 : * after restart.
1959 : */
1960 484 : pa_decr_and_wait_stream_block();
1961 480 : break;
1962 :
1963 0 : default:
1964 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1965 : break;
1966 : }
1967 :
1968 1670 : in_streamed_transaction = false;
1969 1670 : stream_xid = InvalidTransactionId;
1970 :
1971 : /*
1972 : * The parallel apply worker could be in a transaction in which case we
1973 : * need to report the state as STATE_IDLEINTRANSACTION.
1974 : */
1975 1670 : if (IsTransactionOrTransactionBlock())
1976 480 : pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL);
1977 : else
1978 1190 : pgstat_report_activity(STATE_IDLE, NULL);
1979 :
1980 1670 : reset_apply_error_context_info();
1981 1670 : }
1982 :
1983 : /*
1984 : * Helper function to handle STREAM ABORT message when the transaction was
1985 : * serialized to file.
1986 : */
1987 : static void
1988 28 : stream_abort_internal(TransactionId xid, TransactionId subxid)
1989 : {
1990 : /*
1991 : * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1992 : * just delete the files with serialized info.
1993 : */
1994 28 : if (xid == subxid)
1995 2 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
1996 : else
1997 : {
1998 : /*
1999 : * OK, so it's a subxact. We need to read the subxact file for the
2000 : * toplevel transaction, determine the offset tracked for the subxact,
2001 : * and truncate the file with changes. We also remove the subxacts
2002 : * with higher offsets (or rather higher XIDs).
2003 : *
2004 : * We intentionally scan the array from the tail, because we're likely
2005 : * aborting a change for the most recent subtransactions.
2006 : *
2007 : * We can't use the binary search here as subxact XIDs won't
2008 : * necessarily arrive in sorted order, consider the case where we have
2009 : * released the savepoint for multiple subtransactions and then
2010 : * performed rollback to savepoint for one of the earlier
2011 : * sub-transaction.
2012 : */
2013 : int64 i;
2014 : int64 subidx;
2015 : BufFile *fd;
2016 26 : bool found = false;
2017 : char path[MAXPGPATH];
2018 :
2019 26 : subidx = -1;
2020 26 : begin_replication_step();
2021 26 : subxact_info_read(MyLogicalRepWorker->subid, xid);
2022 :
2023 30 : for (i = subxact_data.nsubxacts; i > 0; i--)
2024 : {
2025 22 : if (subxact_data.subxacts[i - 1].xid == subxid)
2026 : {
2027 18 : subidx = (i - 1);
2028 18 : found = true;
2029 18 : break;
2030 : }
2031 : }
2032 :
2033 : /*
2034 : * If it's an empty sub-transaction then we will not find the subxid
2035 : * here so just cleanup the subxact info and return.
2036 : */
2037 26 : if (!found)
2038 : {
2039 : /* Cleanup the subxact info */
2040 8 : cleanup_subxact_info();
2041 8 : end_replication_step();
2042 8 : CommitTransactionCommand();
2043 8 : return;
2044 : }
2045 :
2046 : /* open the changes file */
2047 18 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2048 18 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path,
2049 : O_RDWR, false);
2050 :
2051 : /* OK, truncate the file at the right offset */
2052 18 : BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
2053 18 : subxact_data.subxacts[subidx].offset);
2054 18 : BufFileClose(fd);
2055 :
2056 : /* discard the subxacts added later */
2057 18 : subxact_data.nsubxacts = subidx;
2058 :
2059 : /* write the updated subxact list */
2060 18 : subxact_info_write(MyLogicalRepWorker->subid, xid);
2061 :
2062 18 : end_replication_step();
2063 18 : CommitTransactionCommand();
2064 : }
2065 : }
2066 :
2067 : /*
2068 : * Handle STREAM ABORT message.
2069 : */
2070 : static void
2071 76 : apply_handle_stream_abort(StringInfo s)
2072 : {
2073 : TransactionId xid;
2074 : TransactionId subxid;
2075 : LogicalRepStreamAbortData abort_data;
2076 : ParallelApplyWorkerInfo *winfo;
2077 : TransApplyAction apply_action;
2078 :
2079 : /* Save the message before it is consumed. */
2080 76 : StringInfoData original_msg = *s;
2081 : bool toplevel_xact;
2082 :
2083 76 : if (in_streamed_transaction)
2084 0 : ereport(ERROR,
2085 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2086 : errmsg_internal("STREAM ABORT message without STREAM STOP")));
2087 :
2088 : /* We receive abort information only when we can apply in parallel. */
2089 76 : logicalrep_read_stream_abort(s, &abort_data,
2090 76 : MyLogicalRepWorker->parallel_apply);
2091 :
2092 76 : xid = abort_data.xid;
2093 76 : subxid = abort_data.subxid;
2094 76 : toplevel_xact = (xid == subxid);
2095 :
2096 76 : set_apply_error_context_xact(subxid, abort_data.abort_lsn);
2097 :
2098 76 : apply_action = get_transaction_apply_action(xid, &winfo);
2099 :
2100 76 : switch (apply_action)
2101 : {
2102 28 : case TRANS_LEADER_APPLY:
2103 :
2104 : /*
2105 : * We are in the leader apply worker and the transaction has been
2106 : * serialized to file.
2107 : */
2108 28 : stream_abort_internal(xid, subxid);
2109 :
2110 28 : elog(DEBUG1, "finished processing the STREAM ABORT command");
2111 28 : break;
2112 :
2113 20 : case TRANS_LEADER_SEND_TO_PARALLEL:
2114 : Assert(winfo);
2115 :
2116 : /*
2117 : * For the case of aborting the subtransaction, we increment the
2118 : * number of streaming blocks and take the lock again before
2119 : * sending the STREAM_ABORT to ensure that the parallel apply
2120 : * worker will wait on the lock for the next set of changes after
2121 : * processing the STREAM_ABORT message if it is not already
2122 : * waiting for STREAM_STOP message.
2123 : *
2124 : * It is important to perform this locking before sending the
2125 : * STREAM_ABORT message so that the leader can hold the lock first
2126 : * and the parallel apply worker will wait for the leader to
2127 : * release the lock. This is the same as what we do in
2128 : * apply_handle_stream_stop. See Locking Considerations atop
2129 : * applyparallelworker.c.
2130 : */
2131 20 : if (!toplevel_xact)
2132 : {
2133 18 : pa_unlock_stream(xid, AccessExclusiveLock);
2134 18 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
2135 18 : pa_lock_stream(xid, AccessExclusiveLock);
2136 : }
2137 :
2138 20 : if (pa_send_data(winfo, s->len, s->data))
2139 : {
2140 : /*
2141 : * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
2142 : * wait here for the parallel apply worker to finish as that
2143 : * is not required to maintain the commit order and won't have
2144 : * the risk of failures due to transaction dependencies and
2145 : * deadlocks. However, it is possible that before the parallel
2146 : * worker finishes and we clear the worker info, the xid
2147 : * wraparound happens on the upstream and a new transaction
2148 : * with the same xid can appear and that can lead to duplicate
2149 : * entries in ParallelApplyTxnHash. Yet another problem could
2150 : * be that we may have serialized the changes in partial
2151 : * serialize mode and the file containing xact changes may
2152 : * already exist, and after xid wraparound trying to create
2153 : * the file for the same xid can lead to an error. To avoid
2154 : * these problems, we decide to wait for the aborts to finish.
2155 : *
2156 : * Note, it is okay to not update the flush location position
2157 : * for aborts as in worst case that means such a transaction
2158 : * won't be sent again after restart.
2159 : */
2160 20 : if (toplevel_xact)
2161 2 : pa_xact_finish(winfo, InvalidXLogRecPtr);
2162 :
2163 20 : break;
2164 : }
2165 :
2166 : /*
2167 : * Switch to serialize mode when we are not able to send the
2168 : * change to parallel apply worker.
2169 : */
2170 0 : pa_switch_to_partial_serialize(winfo, true);
2171 :
2172 : /* fall through */
2173 4 : case TRANS_LEADER_PARTIAL_SERIALIZE:
2174 : Assert(winfo);
2175 :
2176 : /*
2177 : * Parallel apply worker might have applied some changes, so write
2178 : * the STREAM_ABORT message so that it can rollback the
2179 : * subtransaction if needed.
2180 : */
2181 4 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_ABORT,
2182 : &original_msg);
2183 :
2184 4 : if (toplevel_xact)
2185 : {
2186 2 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
2187 2 : pa_xact_finish(winfo, InvalidXLogRecPtr);
2188 : }
2189 4 : break;
2190 :
2191 24 : case TRANS_PARALLEL_APPLY:
2192 :
2193 : /*
2194 : * If the parallel apply worker is applying spooled messages then
2195 : * close the file before aborting.
2196 : */
2197 24 : if (toplevel_xact && stream_fd)
2198 2 : stream_close_file();
2199 :
2200 24 : pa_stream_abort(&abort_data);
2201 :
2202 : /*
2203 : * We need to wait after processing rollback to savepoint for the
2204 : * next set of changes.
2205 : *
2206 : * We have a race condition here due to which we can start waiting
2207 : * here when there are more chunk of streams in the queue. See
2208 : * apply_handle_stream_stop.
2209 : */
2210 24 : if (!toplevel_xact)
2211 20 : pa_decr_and_wait_stream_block();
2212 :
2213 24 : elog(DEBUG1, "finished processing the STREAM ABORT command");
2214 24 : break;
2215 :
2216 0 : default:
2217 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2218 : break;
2219 : }
2220 :
2221 76 : reset_apply_error_context_info();
2222 76 : }
2223 :
2224 : /*
2225 : * Ensure that the passed location is fileset's end.
2226 : */
2227 : static void
2228 8 : ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
2229 : off_t offset)
2230 : {
2231 : char path[MAXPGPATH];
2232 : BufFile *fd;
2233 : int last_fileno;
2234 : off_t last_offset;
2235 :
2236 : Assert(!IsTransactionState());
2237 :
2238 8 : begin_replication_step();
2239 :
2240 8 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2241 :
2242 8 : fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2243 :
2244 8 : BufFileSeek(fd, 0, 0, SEEK_END);
2245 8 : BufFileTell(fd, &last_fileno, &last_offset);
2246 :
2247 8 : BufFileClose(fd);
2248 :
2249 8 : end_replication_step();
2250 :
2251 8 : if (last_fileno != fileno || last_offset != offset)
2252 0 : elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2253 : path);
2254 8 : }
2255 :
2256 : /*
2257 : * Common spoolfile processing.
2258 : */
2259 : void
2260 62 : apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
2261 : XLogRecPtr lsn)
2262 : {
2263 : int nchanges;
2264 : char path[MAXPGPATH];
2265 62 : char *buffer = NULL;
2266 : MemoryContext oldcxt;
2267 : ResourceOwner oldowner;
2268 : int fileno;
2269 : off_t offset;
2270 :
2271 62 : if (!am_parallel_apply_worker())
2272 54 : maybe_start_skipping_changes(lsn);
2273 :
2274 : /* Make sure we have an open transaction */
2275 62 : begin_replication_step();
2276 :
2277 : /*
2278 : * Allocate file handle and memory required to process all the messages in
2279 : * TopTransactionContext to avoid them getting reset after each message is
2280 : * processed.
2281 : */
2282 62 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
2283 :
2284 : /* Open the spool file for the committed/prepared transaction */
2285 62 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2286 62 : elog(DEBUG1, "replaying changes from file \"%s\"", path);
2287 :
2288 : /*
2289 : * Make sure the file is owned by the toplevel transaction so that the
2290 : * file will not be accidentally closed when aborting a subtransaction.
2291 : */
2292 62 : oldowner = CurrentResourceOwner;
2293 62 : CurrentResourceOwner = TopTransactionResourceOwner;
2294 :
2295 62 : stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2296 :
2297 62 : CurrentResourceOwner = oldowner;
2298 :
2299 62 : buffer = palloc(BLCKSZ);
2300 :
2301 62 : MemoryContextSwitchTo(oldcxt);
2302 :
2303 62 : remote_final_lsn = lsn;
2304 :
2305 : /*
2306 : * Make sure the handle apply_dispatch methods are aware we're in a remote
2307 : * transaction.
2308 : */
2309 62 : in_remote_transaction = true;
2310 62 : pgstat_report_activity(STATE_RUNNING, NULL);
2311 :
2312 62 : end_replication_step();
2313 :
2314 : /*
2315 : * Read the entries one by one and pass them through the same logic as in
2316 : * apply_dispatch.
2317 : */
2318 62 : nchanges = 0;
2319 : while (true)
2320 176940 : {
2321 : StringInfoData s2;
2322 : size_t nbytes;
2323 : int len;
2324 :
2325 177002 : CHECK_FOR_INTERRUPTS();
2326 :
2327 : /* read length of the on-disk record */
2328 177002 : nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2329 :
2330 : /* have we reached end of the file? */
2331 177002 : if (nbytes == 0)
2332 52 : break;
2333 :
2334 : /* do we have a correct length? */
2335 176950 : if (len <= 0)
2336 0 : elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2337 : len, path);
2338 :
2339 : /* make sure we have sufficiently large buffer */
2340 176950 : buffer = repalloc(buffer, len);
2341 :
2342 : /* and finally read the data into the buffer */
2343 176950 : BufFileReadExact(stream_fd, buffer, len);
2344 :
2345 176950 : BufFileTell(stream_fd, &fileno, &offset);
2346 :
2347 : /* init a stringinfo using the buffer and call apply_dispatch */
2348 176950 : initReadOnlyStringInfo(&s2, buffer, len);
2349 :
2350 : /* Ensure we are reading the data into our memory context. */
2351 176950 : oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
2352 :
2353 176950 : apply_dispatch(&s2);
2354 :
2355 176948 : MemoryContextReset(ApplyMessageContext);
2356 :
2357 176948 : MemoryContextSwitchTo(oldcxt);
2358 :
2359 176948 : nchanges++;
2360 :
2361 : /*
2362 : * It is possible the file has been closed because we have processed
2363 : * the transaction end message like stream_commit in which case that
2364 : * must be the last message.
2365 : */
2366 176948 : if (!stream_fd)
2367 : {
2368 8 : ensure_last_message(stream_fileset, xid, fileno, offset);
2369 8 : break;
2370 : }
2371 :
2372 176940 : if (nchanges % 1000 == 0)
2373 168 : elog(DEBUG1, "replayed %d changes from file \"%s\"",
2374 : nchanges, path);
2375 : }
2376 :
2377 60 : if (stream_fd)
2378 52 : stream_close_file();
2379 :
2380 60 : elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2381 : nchanges, path);
2382 :
2383 60 : return;
2384 : }
2385 :
2386 : /*
2387 : * Handle STREAM COMMIT message.
2388 : */
2389 : static void
2390 122 : apply_handle_stream_commit(StringInfo s)
2391 : {
2392 : TransactionId xid;
2393 : LogicalRepCommitData commit_data;
2394 : ParallelApplyWorkerInfo *winfo;
2395 : TransApplyAction apply_action;
2396 :
2397 : /* Save the message before it is consumed. */
2398 122 : StringInfoData original_msg = *s;
2399 :
2400 122 : if (in_streamed_transaction)
2401 0 : ereport(ERROR,
2402 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2403 : errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2404 :
2405 122 : xid = logicalrep_read_stream_commit(s, &commit_data);
2406 122 : set_apply_error_context_xact(xid, commit_data.commit_lsn);
2407 :
2408 122 : apply_action = get_transaction_apply_action(xid, &winfo);
2409 :
2410 122 : switch (apply_action)
2411 : {
2412 44 : case TRANS_LEADER_APPLY:
2413 :
2414 : /*
2415 : * The transaction has been serialized to file, so replay all the
2416 : * spooled operations.
2417 : */
2418 44 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid,
2419 : commit_data.commit_lsn);
2420 :
2421 42 : apply_handle_commit_internal(&commit_data);
2422 :
2423 : /* Unlink the files with serialized changes and subxact info. */
2424 42 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
2425 :
2426 42 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2427 42 : break;
2428 :
2429 36 : case TRANS_LEADER_SEND_TO_PARALLEL:
2430 : Assert(winfo);
2431 :
2432 36 : if (pa_send_data(winfo, s->len, s->data))
2433 : {
2434 : /* Finish processing the streaming transaction. */
2435 36 : pa_xact_finish(winfo, commit_data.end_lsn);
2436 34 : break;
2437 : }
2438 :
2439 : /*
2440 : * Switch to serialize mode when we are not able to send the
2441 : * change to parallel apply worker.
2442 : */
2443 0 : pa_switch_to_partial_serialize(winfo, true);
2444 :
2445 : /* fall through */
2446 4 : case TRANS_LEADER_PARTIAL_SERIALIZE:
2447 : Assert(winfo);
2448 :
2449 4 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_COMMIT,
2450 : &original_msg);
2451 :
2452 4 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
2453 :
2454 : /* Finish processing the streaming transaction. */
2455 4 : pa_xact_finish(winfo, commit_data.end_lsn);
2456 4 : break;
2457 :
2458 38 : case TRANS_PARALLEL_APPLY:
2459 :
2460 : /*
2461 : * If the parallel apply worker is applying spooled messages then
2462 : * close the file before committing.
2463 : */
2464 38 : if (stream_fd)
2465 4 : stream_close_file();
2466 :
2467 38 : apply_handle_commit_internal(&commit_data);
2468 :
2469 38 : MyParallelShared->last_commit_end = XactLastCommitEnd;
2470 :
2471 : /*
2472 : * It is important to set the transaction state as finished before
2473 : * releasing the lock. See pa_wait_for_xact_finish.
2474 : */
2475 38 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
2476 38 : pa_unlock_transaction(xid, AccessExclusiveLock);
2477 :
2478 38 : pa_reset_subtrans();
2479 :
2480 38 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2481 38 : break;
2482 :
2483 0 : default:
2484 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2485 : break;
2486 : }
2487 :
2488 : /*
2489 : * Process any tables that are being synchronized in parallel, as well as
2490 : * any newly added tables or sequences.
2491 : */
2492 118 : ProcessSyncingRelations(commit_data.end_lsn);
2493 :
2494 118 : pgstat_report_activity(STATE_IDLE, NULL);
2495 :
2496 118 : reset_apply_error_context_info();
2497 118 : }
2498 :
2499 : /*
2500 : * Helper function for apply_handle_commit and apply_handle_stream_commit.
2501 : */
2502 : static void
2503 950 : apply_handle_commit_internal(LogicalRepCommitData *commit_data)
2504 : {
2505 950 : if (is_skipping_changes())
2506 : {
2507 4 : stop_skipping_changes();
2508 :
2509 : /*
2510 : * Start a new transaction to clear the subskiplsn, if not started
2511 : * yet.
2512 : */
2513 4 : if (!IsTransactionState())
2514 2 : StartTransactionCommand();
2515 : }
2516 :
2517 950 : if (IsTransactionState())
2518 : {
2519 : /*
2520 : * The transaction is either non-empty or skipped, so we clear the
2521 : * subskiplsn.
2522 : */
2523 950 : clear_subscription_skip_lsn(commit_data->commit_lsn);
2524 :
2525 : /*
2526 : * Update origin state so we can restart streaming from correct
2527 : * position in case of crash.
2528 : */
2529 950 : replorigin_session_origin_lsn = commit_data->end_lsn;
2530 950 : replorigin_session_origin_timestamp = commit_data->committime;
2531 :
2532 950 : CommitTransactionCommand();
2533 :
2534 950 : if (IsTransactionBlock())
2535 : {
2536 8 : EndTransactionBlock(false);
2537 8 : CommitTransactionCommand();
2538 : }
2539 :
2540 950 : pgstat_report_stat(false);
2541 :
2542 950 : store_flush_position(commit_data->end_lsn, XactLastCommitEnd);
2543 : }
2544 : else
2545 : {
2546 : /* Process any invalidation messages that might have accumulated. */
2547 0 : AcceptInvalidationMessages();
2548 0 : maybe_reread_subscription();
2549 : }
2550 :
2551 950 : in_remote_transaction = false;
2552 950 : }
2553 :
2554 : /*
2555 : * Handle RELATION message.
2556 : *
2557 : * Note we don't do validation against local schema here. The validation
2558 : * against local schema is postponed until first change for given relation
2559 : * comes as we only care about it when applying changes for it anyway and we
2560 : * do less locking this way.
2561 : */
2562 : static void
2563 962 : apply_handle_relation(StringInfo s)
2564 : {
2565 : LogicalRepRelation *rel;
2566 :
2567 962 : if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
2568 72 : return;
2569 :
2570 890 : rel = logicalrep_read_rel(s);
2571 890 : logicalrep_relmap_update(rel);
2572 :
2573 : /* Also reset all entries in the partition map that refer to remoterel. */
2574 890 : logicalrep_partmap_reset_relmap(rel);
2575 : }
2576 :
2577 : /*
2578 : * Handle TYPE message.
2579 : *
2580 : * This implementation pays no attention to TYPE messages; we expect the user
2581 : * to have set things up so that the incoming data is acceptable to the input
2582 : * functions for the locally subscribed tables. Hence, we just read and
2583 : * discard the message.
2584 : */
2585 : static void
2586 36 : apply_handle_type(StringInfo s)
2587 : {
2588 : LogicalRepTyp typ;
2589 :
2590 36 : if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s))
2591 0 : return;
2592 :
2593 36 : logicalrep_read_typ(s, &typ);
2594 : }
2595 :
2596 : /*
2597 : * Check that we (the subscription owner) have sufficient privileges on the
2598 : * target relation to perform the given operation.
2599 : */
2600 : static void
2601 440896 : TargetPrivilegesCheck(Relation rel, AclMode mode)
2602 : {
2603 : Oid relid;
2604 : AclResult aclresult;
2605 :
2606 440896 : relid = RelationGetRelid(rel);
2607 440896 : aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
2608 440896 : if (aclresult != ACLCHECK_OK)
2609 20 : aclcheck_error(aclresult,
2610 20 : get_relkind_objtype(rel->rd_rel->relkind),
2611 20 : get_rel_name(relid));
2612 :
2613 : /*
2614 : * We lack the infrastructure to honor RLS policies. It might be possible
2615 : * to add such infrastructure here, but tablesync workers lack it, too, so
2616 : * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2617 : * but it seems dangerous to replicate a TRUNCATE and then refuse to
2618 : * replicate subsequent INSERTs, so we forbid all commands the same.
2619 : */
2620 440876 : if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2621 8 : ereport(ERROR,
2622 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2623 : errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2624 : GetUserNameFromId(GetUserId(), true),
2625 : RelationGetRelationName(rel))));
2626 440868 : }
2627 :
2628 : /*
2629 : * Handle INSERT message.
2630 : */
2631 :
2632 : static void
2633 371962 : apply_handle_insert(StringInfo s)
2634 : {
2635 : LogicalRepRelMapEntry *rel;
2636 : LogicalRepTupleData newtup;
2637 : LogicalRepRelId relid;
2638 : UserContext ucxt;
2639 : ApplyExecutionData *edata;
2640 : EState *estate;
2641 : TupleTableSlot *remoteslot;
2642 : MemoryContext oldctx;
2643 : bool run_as_owner;
2644 :
2645 : /*
2646 : * Quick return if we are skipping data modification changes or handling
2647 : * streamed transactions.
2648 : */
2649 723922 : if (is_skipping_changes() ||
2650 351960 : handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
2651 220114 : return;
2652 :
2653 151948 : begin_replication_step();
2654 :
2655 151944 : relid = logicalrep_read_insert(s, &newtup);
2656 151944 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2657 151926 : if (!should_apply_changes_for_rel(rel))
2658 : {
2659 : /*
2660 : * The relation can't become interesting in the middle of the
2661 : * transaction so it's safe to unlock it.
2662 : */
2663 100 : logicalrep_rel_close(rel, RowExclusiveLock);
2664 100 : end_replication_step();
2665 100 : return;
2666 : }
2667 :
2668 : /*
2669 : * Make sure that any user-supplied code runs as the table owner, unless
2670 : * the user has opted out of that behavior.
2671 : */
2672 151826 : run_as_owner = MySubscription->runasowner;
2673 151826 : if (!run_as_owner)
2674 151808 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2675 :
2676 : /* Set relation for error callback */
2677 151826 : apply_error_callback_arg.rel = rel;
2678 :
2679 : /* Initialize the executor state. */
2680 151826 : edata = create_edata_for_relation(rel);
2681 151826 : estate = edata->estate;
2682 151826 : remoteslot = ExecInitExtraTupleSlot(estate,
2683 151826 : RelationGetDescr(rel->localrel),
2684 : &TTSOpsVirtual);
2685 :
2686 : /* Process and store remote tuple in the slot */
2687 151826 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2688 151826 : slot_store_data(remoteslot, rel, &newtup);
2689 151826 : slot_fill_defaults(rel, estate, remoteslot);
2690 151826 : MemoryContextSwitchTo(oldctx);
2691 :
2692 : /* For a partitioned table, insert the tuple into a partition. */
2693 151826 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2694 144 : apply_handle_tuple_routing(edata,
2695 : remoteslot, NULL, CMD_INSERT);
2696 : else
2697 : {
2698 151682 : ResultRelInfo *relinfo = edata->targetRelInfo;
2699 :
2700 151682 : ExecOpenIndices(relinfo, false);
2701 151682 : apply_handle_insert_internal(edata, relinfo, remoteslot);
2702 151650 : ExecCloseIndices(relinfo);
2703 : }
2704 :
2705 151738 : finish_edata(edata);
2706 :
2707 : /* Reset relation for error callback */
2708 151738 : apply_error_callback_arg.rel = NULL;
2709 :
2710 151738 : if (!run_as_owner)
2711 151728 : RestoreUserContext(&ucxt);
2712 :
2713 151738 : logicalrep_rel_close(rel, NoLock);
2714 :
2715 151738 : end_replication_step();
2716 : }
2717 :
2718 : /*
2719 : * Workhorse for apply_handle_insert()
2720 : * relinfo is for the relation we're actually inserting into
2721 : * (could be a child partition of edata->targetRelInfo)
2722 : */
2723 : static void
2724 151828 : apply_handle_insert_internal(ApplyExecutionData *edata,
2725 : ResultRelInfo *relinfo,
2726 : TupleTableSlot *remoteslot)
2727 : {
2728 151828 : EState *estate = edata->estate;
2729 :
2730 : /* Caller should have opened indexes already. */
2731 : Assert(relinfo->ri_IndexRelationDescs != NULL ||
2732 : !relinfo->ri_RelationDesc->rd_rel->relhasindex ||
2733 : RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
2734 :
2735 : /* Caller will not have done this bit. */
2736 : Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
2737 151828 : InitConflictIndexes(relinfo);
2738 :
2739 : /* Do the insert. */
2740 151828 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
2741 151812 : ExecSimpleRelationInsert(relinfo, estate, remoteslot);
2742 151740 : }
2743 :
2744 : /*
2745 : * Check if the logical replication relation is updatable and throw
2746 : * appropriate error if it isn't.
2747 : */
2748 : static void
2749 144584 : check_relation_updatable(LogicalRepRelMapEntry *rel)
2750 : {
2751 : /*
2752 : * For partitioned tables, we only need to care if the target partition is
2753 : * updatable (aka has PK or RI defined for it).
2754 : */
2755 144584 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2756 60 : return;
2757 :
2758 : /* Updatable, no error. */
2759 144524 : if (rel->updatable)
2760 144524 : return;
2761 :
2762 : /*
2763 : * We are in error mode so it's fine this is somewhat slow. It's better to
2764 : * give user correct error.
2765 : */
2766 0 : if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
2767 : {
2768 0 : ereport(ERROR,
2769 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2770 : errmsg("publisher did not send replica identity column "
2771 : "expected by the logical replication target relation \"%s.%s\"",
2772 : rel->remoterel.nspname, rel->remoterel.relname)));
2773 : }
2774 :
2775 0 : ereport(ERROR,
2776 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2777 : errmsg("logical replication target relation \"%s.%s\" has "
2778 : "neither REPLICA IDENTITY index nor PRIMARY "
2779 : "KEY and published relation does not have "
2780 : "REPLICA IDENTITY FULL",
2781 : rel->remoterel.nspname, rel->remoterel.relname)));
2782 : }
2783 :
2784 : /*
2785 : * Handle UPDATE message.
2786 : *
2787 : * TODO: FDW support
2788 : */
2789 : static void
2790 132330 : apply_handle_update(StringInfo s)
2791 : {
2792 : LogicalRepRelMapEntry *rel;
2793 : LogicalRepRelId relid;
2794 : UserContext ucxt;
2795 : ApplyExecutionData *edata;
2796 : EState *estate;
2797 : LogicalRepTupleData oldtup;
2798 : LogicalRepTupleData newtup;
2799 : bool has_oldtup;
2800 : TupleTableSlot *remoteslot;
2801 : RTEPermissionInfo *target_perminfo;
2802 : MemoryContext oldctx;
2803 : bool run_as_owner;
2804 :
2805 : /*
2806 : * Quick return if we are skipping data modification changes or handling
2807 : * streamed transactions.
2808 : */
2809 264654 : if (is_skipping_changes() ||
2810 132324 : handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
2811 68446 : return;
2812 :
2813 63884 : begin_replication_step();
2814 :
2815 63882 : relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
2816 : &newtup);
2817 63882 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2818 63882 : if (!should_apply_changes_for_rel(rel))
2819 : {
2820 : /*
2821 : * The relation can't become interesting in the middle of the
2822 : * transaction so it's safe to unlock it.
2823 : */
2824 0 : logicalrep_rel_close(rel, RowExclusiveLock);
2825 0 : end_replication_step();
2826 0 : return;
2827 : }
2828 :
2829 : /* Set relation for error callback */
2830 63882 : apply_error_callback_arg.rel = rel;
2831 :
2832 : /* Check if we can do the update. */
2833 63882 : check_relation_updatable(rel);
2834 :
2835 : /*
2836 : * Make sure that any user-supplied code runs as the table owner, unless
2837 : * the user has opted out of that behavior.
2838 : */
2839 63882 : run_as_owner = MySubscription->runasowner;
2840 63882 : if (!run_as_owner)
2841 63874 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2842 :
2843 : /* Initialize the executor state. */
2844 63880 : edata = create_edata_for_relation(rel);
2845 63880 : estate = edata->estate;
2846 63880 : remoteslot = ExecInitExtraTupleSlot(estate,
2847 63880 : RelationGetDescr(rel->localrel),
2848 : &TTSOpsVirtual);
2849 :
2850 : /*
2851 : * Populate updatedCols so that per-column triggers can fire, and so
2852 : * executor can correctly pass down indexUnchanged hint. This could
2853 : * include more columns than were actually changed on the publisher
2854 : * because the logical replication protocol doesn't contain that
2855 : * information. But it would for example exclude columns that only exist
2856 : * on the subscriber, since we are not touching those.
2857 : */
2858 63880 : target_perminfo = list_nth(estate->es_rteperminfos, 0);
2859 318642 : for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2860 : {
2861 254762 : CompactAttribute *att = TupleDescCompactAttr(remoteslot->tts_tupleDescriptor, i);
2862 254762 : int remoteattnum = rel->attrmap->attnums[i];
2863 :
2864 254762 : if (!att->attisdropped && remoteattnum >= 0)
2865 : {
2866 : Assert(remoteattnum < newtup.ncols);
2867 137728 : if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
2868 137722 : target_perminfo->updatedCols =
2869 137722 : bms_add_member(target_perminfo->updatedCols,
2870 : i + 1 - FirstLowInvalidHeapAttributeNumber);
2871 : }
2872 : }
2873 :
2874 : /* Build the search tuple. */
2875 63880 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2876 63880 : slot_store_data(remoteslot, rel,
2877 63880 : has_oldtup ? &oldtup : &newtup);
2878 63880 : MemoryContextSwitchTo(oldctx);
2879 :
2880 : /* For a partitioned table, apply update to correct partition. */
2881 63880 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2882 26 : apply_handle_tuple_routing(edata,
2883 : remoteslot, &newtup, CMD_UPDATE);
2884 : else
2885 63854 : apply_handle_update_internal(edata, edata->targetRelInfo,
2886 : remoteslot, &newtup, rel->localindexoid);
2887 :
2888 63862 : finish_edata(edata);
2889 :
2890 : /* Reset relation for error callback */
2891 63862 : apply_error_callback_arg.rel = NULL;
2892 :
2893 63862 : if (!run_as_owner)
2894 63858 : RestoreUserContext(&ucxt);
2895 :
2896 63862 : logicalrep_rel_close(rel, NoLock);
2897 :
2898 63862 : end_replication_step();
2899 : }
2900 :
2901 : /*
2902 : * Workhorse for apply_handle_update()
2903 : * relinfo is for the relation we're actually updating in
2904 : * (could be a child partition of edata->targetRelInfo)
2905 : */
2906 : static void
2907 63854 : apply_handle_update_internal(ApplyExecutionData *edata,
2908 : ResultRelInfo *relinfo,
2909 : TupleTableSlot *remoteslot,
2910 : LogicalRepTupleData *newtup,
2911 : Oid localindexoid)
2912 : {
2913 63854 : EState *estate = edata->estate;
2914 63854 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2915 63854 : Relation localrel = relinfo->ri_RelationDesc;
2916 : EPQState epqstate;
2917 63854 : TupleTableSlot *localslot = NULL;
2918 63854 : ConflictTupleInfo conflicttuple = {0};
2919 : bool found;
2920 : MemoryContext oldctx;
2921 :
2922 63854 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2923 63854 : ExecOpenIndices(relinfo, false);
2924 :
2925 63854 : found = FindReplTupleInLocalRel(edata, localrel,
2926 : &relmapentry->remoterel,
2927 : localindexoid,
2928 : remoteslot, &localslot);
2929 :
2930 : /*
2931 : * Tuple found.
2932 : *
2933 : * Note this will fail if there are other conflicting unique indexes.
2934 : */
2935 63842 : if (found)
2936 : {
2937 : /*
2938 : * Report the conflict if the tuple was modified by a different
2939 : * origin.
2940 : */
2941 63828 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
2942 4 : &conflicttuple.origin, &conflicttuple.ts) &&
2943 4 : conflicttuple.origin != replorigin_session_origin)
2944 : {
2945 : TupleTableSlot *newslot;
2946 :
2947 : /* Store the new tuple for conflict reporting */
2948 4 : newslot = table_slot_create(localrel, &estate->es_tupleTable);
2949 4 : slot_store_data(newslot, relmapentry, newtup);
2950 :
2951 4 : conflicttuple.slot = localslot;
2952 :
2953 4 : ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
2954 : remoteslot, newslot,
2955 4 : list_make1(&conflicttuple));
2956 : }
2957 :
2958 : /* Process and store remote tuple in the slot */
2959 63828 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2960 63828 : slot_modify_data(remoteslot, localslot, relmapentry, newtup);
2961 63828 : MemoryContextSwitchTo(oldctx);
2962 :
2963 63828 : EvalPlanQualSetSlot(&epqstate, remoteslot);
2964 :
2965 63828 : InitConflictIndexes(relinfo);
2966 :
2967 : /* Do the actual update. */
2968 63828 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
2969 63828 : ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2970 : remoteslot);
2971 : }
2972 : else
2973 : {
2974 : ConflictType type;
2975 14 : TupleTableSlot *newslot = localslot;
2976 :
2977 : /*
2978 : * Detecting whether the tuple was recently deleted or never existed
2979 : * is crucial to avoid misleading the user during conflict handling.
2980 : */
2981 14 : if (FindDeletedTupleInLocalRel(localrel, localindexoid, remoteslot,
2982 : &conflicttuple.xmin,
2983 : &conflicttuple.origin,
2984 6 : &conflicttuple.ts) &&
2985 6 : conflicttuple.origin != replorigin_session_origin)
2986 6 : type = CT_UPDATE_DELETED;
2987 : else
2988 8 : type = CT_UPDATE_MISSING;
2989 :
2990 : /* Store the new tuple for conflict reporting */
2991 14 : slot_store_data(newslot, relmapentry, newtup);
2992 :
2993 : /*
2994 : * The tuple to be updated could not be found or was deleted. Do
2995 : * nothing except for emitting a log message.
2996 : */
2997 14 : ReportApplyConflict(estate, relinfo, LOG, type, remoteslot, newslot,
2998 14 : list_make1(&conflicttuple));
2999 : }
3000 :
3001 : /* Cleanup. */
3002 63836 : ExecCloseIndices(relinfo);
3003 63836 : EvalPlanQualEnd(&epqstate);
3004 63836 : }
3005 :
3006 : /*
3007 : * Handle DELETE message.
3008 : *
3009 : * TODO: FDW support
3010 : */
3011 : static void
3012 163872 : apply_handle_delete(StringInfo s)
3013 : {
3014 : LogicalRepRelMapEntry *rel;
3015 : LogicalRepTupleData oldtup;
3016 : LogicalRepRelId relid;
3017 : UserContext ucxt;
3018 : ApplyExecutionData *edata;
3019 : EState *estate;
3020 : TupleTableSlot *remoteslot;
3021 : MemoryContext oldctx;
3022 : bool run_as_owner;
3023 :
3024 : /*
3025 : * Quick return if we are skipping data modification changes or handling
3026 : * streamed transactions.
3027 : */
3028 327744 : if (is_skipping_changes() ||
3029 163872 : handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
3030 83230 : return;
3031 :
3032 80642 : begin_replication_step();
3033 :
3034 80642 : relid = logicalrep_read_delete(s, &oldtup);
3035 80642 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
3036 80642 : if (!should_apply_changes_for_rel(rel))
3037 : {
3038 : /*
3039 : * The relation can't become interesting in the middle of the
3040 : * transaction so it's safe to unlock it.
3041 : */
3042 0 : logicalrep_rel_close(rel, RowExclusiveLock);
3043 0 : end_replication_step();
3044 0 : return;
3045 : }
3046 :
3047 : /* Set relation for error callback */
3048 80642 : apply_error_callback_arg.rel = rel;
3049 :
3050 : /* Check if we can do the delete. */
3051 80642 : check_relation_updatable(rel);
3052 :
3053 : /*
3054 : * Make sure that any user-supplied code runs as the table owner, unless
3055 : * the user has opted out of that behavior.
3056 : */
3057 80642 : run_as_owner = MySubscription->runasowner;
3058 80642 : if (!run_as_owner)
3059 80638 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
3060 :
3061 : /* Initialize the executor state. */
3062 80642 : edata = create_edata_for_relation(rel);
3063 80642 : estate = edata->estate;
3064 80642 : remoteslot = ExecInitExtraTupleSlot(estate,
3065 80642 : RelationGetDescr(rel->localrel),
3066 : &TTSOpsVirtual);
3067 :
3068 : /* Build the search tuple. */
3069 80642 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3070 80642 : slot_store_data(remoteslot, rel, &oldtup);
3071 80642 : MemoryContextSwitchTo(oldctx);
3072 :
3073 : /* For a partitioned table, apply delete to correct partition. */
3074 80642 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3075 34 : apply_handle_tuple_routing(edata,
3076 : remoteslot, NULL, CMD_DELETE);
3077 : else
3078 : {
3079 80608 : ResultRelInfo *relinfo = edata->targetRelInfo;
3080 :
3081 80608 : ExecOpenIndices(relinfo, false);
3082 80608 : apply_handle_delete_internal(edata, relinfo,
3083 : remoteslot, rel->localindexoid);
3084 80608 : ExecCloseIndices(relinfo);
3085 : }
3086 :
3087 80642 : finish_edata(edata);
3088 :
3089 : /* Reset relation for error callback */
3090 80642 : apply_error_callback_arg.rel = NULL;
3091 :
3092 80642 : if (!run_as_owner)
3093 80638 : RestoreUserContext(&ucxt);
3094 :
3095 80642 : logicalrep_rel_close(rel, NoLock);
3096 :
3097 80642 : end_replication_step();
3098 : }
3099 :
3100 : /*
3101 : * Workhorse for apply_handle_delete()
3102 : * relinfo is for the relation we're actually deleting from
3103 : * (could be a child partition of edata->targetRelInfo)
3104 : */
3105 : static void
3106 80642 : apply_handle_delete_internal(ApplyExecutionData *edata,
3107 : ResultRelInfo *relinfo,
3108 : TupleTableSlot *remoteslot,
3109 : Oid localindexoid)
3110 : {
3111 80642 : EState *estate = edata->estate;
3112 80642 : Relation localrel = relinfo->ri_RelationDesc;
3113 80642 : LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
3114 : EPQState epqstate;
3115 : TupleTableSlot *localslot;
3116 80642 : ConflictTupleInfo conflicttuple = {0};
3117 : bool found;
3118 :
3119 80642 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3120 :
3121 : /* Caller should have opened indexes already. */
3122 : Assert(relinfo->ri_IndexRelationDescs != NULL ||
3123 : !localrel->rd_rel->relhasindex ||
3124 : RelationGetIndexList(localrel) == NIL);
3125 :
3126 80642 : found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
3127 : remoteslot, &localslot);
3128 :
3129 : /* If found delete it. */
3130 80642 : if (found)
3131 : {
3132 : /*
3133 : * Report the conflict if the tuple was modified by a different
3134 : * origin.
3135 : */
3136 80624 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
3137 12 : &conflicttuple.origin, &conflicttuple.ts) &&
3138 12 : conflicttuple.origin != replorigin_session_origin)
3139 : {
3140 10 : conflicttuple.slot = localslot;
3141 10 : ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
3142 : remoteslot, NULL,
3143 10 : list_make1(&conflicttuple));
3144 : }
3145 :
3146 80624 : EvalPlanQualSetSlot(&epqstate, localslot);
3147 :
3148 : /* Do the actual delete. */
3149 80624 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_DELETE);
3150 80624 : ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
3151 : }
3152 : else
3153 : {
3154 : /*
3155 : * The tuple to be deleted could not be found. Do nothing except for
3156 : * emitting a log message.
3157 : */
3158 18 : ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
3159 18 : remoteslot, NULL, list_make1(&conflicttuple));
3160 : }
3161 :
3162 : /* Cleanup. */
3163 80642 : EvalPlanQualEnd(&epqstate);
3164 80642 : }
3165 :
3166 : /*
3167 : * Try to find a tuple received from the publication side (in 'remoteslot') in
3168 : * the corresponding local relation using either replica identity index,
3169 : * primary key, index or if needed, sequential scan.
3170 : *
3171 : * Local tuple, if found, is returned in '*localslot'.
3172 : */
3173 : static bool
3174 144522 : FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
3175 : LogicalRepRelation *remoterel,
3176 : Oid localidxoid,
3177 : TupleTableSlot *remoteslot,
3178 : TupleTableSlot **localslot)
3179 : {
3180 144522 : EState *estate = edata->estate;
3181 : bool found;
3182 :
3183 : /*
3184 : * Regardless of the top-level operation, we're performing a read here, so
3185 : * check for SELECT privileges.
3186 : */
3187 144522 : TargetPrivilegesCheck(localrel, ACL_SELECT);
3188 :
3189 144510 : *localslot = table_slot_create(localrel, &estate->es_tupleTable);
3190 :
3191 : Assert(OidIsValid(localidxoid) ||
3192 : (remoterel->replident == REPLICA_IDENTITY_FULL));
3193 :
3194 144510 : if (OidIsValid(localidxoid))
3195 : {
3196 : #ifdef USE_ASSERT_CHECKING
3197 : Relation idxrel = index_open(localidxoid, AccessShareLock);
3198 :
3199 : /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
3200 : Assert(GetRelationIdentityOrPK(localrel) == localidxoid ||
3201 : (remoterel->replident == REPLICA_IDENTITY_FULL &&
3202 : IsIndexUsableForReplicaIdentityFull(idxrel,
3203 : edata->targetRel->attrmap)));
3204 : index_close(idxrel, AccessShareLock);
3205 : #endif
3206 :
3207 144208 : found = RelationFindReplTupleByIndex(localrel, localidxoid,
3208 : LockTupleExclusive,
3209 : remoteslot, *localslot);
3210 : }
3211 : else
3212 302 : found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
3213 : remoteslot, *localslot);
3214 :
3215 144510 : return found;
3216 : }
3217 :
3218 : /*
3219 : * Determine whether the index can reliably locate the deleted tuple in the
3220 : * local relation.
3221 : *
3222 : * An index may exclude deleted tuples if it was re-indexed or re-created during
3223 : * change application. Therefore, an index is considered usable only if the
3224 : * conflict detection slot.xmin (conflict_detection_xmin) is greater than the
3225 : * index tuple's xmin. This ensures that any tuples deleted prior to the index
3226 : * creation or re-indexing are not relevant for conflict detection in the
3227 : * current apply worker.
3228 : *
3229 : * Note that indexes may also be excluded if they were modified by other DDL
3230 : * operations, such as ALTER INDEX. However, this is acceptable, as the
3231 : * likelihood of such DDL changes coinciding with the need to scan dead
3232 : * tuples for the update_deleted is low.
3233 : */
3234 : static bool
3235 2 : IsIndexUsableForFindingDeletedTuple(Oid localindexoid,
3236 : TransactionId conflict_detection_xmin)
3237 : {
3238 : HeapTuple index_tuple;
3239 : TransactionId index_xmin;
3240 :
3241 2 : index_tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(localindexoid));
3242 :
3243 2 : if (!HeapTupleIsValid(index_tuple)) /* should not happen */
3244 0 : elog(ERROR, "cache lookup failed for index %u", localindexoid);
3245 :
3246 : /*
3247 : * No need to check for a frozen transaction ID, as
3248 : * TransactionIdPrecedes() manages it internally, treating it as falling
3249 : * behind the conflict_detection_xmin.
3250 : */
3251 2 : index_xmin = HeapTupleHeaderGetXmin(index_tuple->t_data);
3252 :
3253 2 : ReleaseSysCache(index_tuple);
3254 :
3255 2 : return TransactionIdPrecedes(index_xmin, conflict_detection_xmin);
3256 : }
3257 :
3258 : /*
3259 : * Attempts to locate a deleted tuple in the local relation that matches the
3260 : * values of the tuple received from the publication side (in 'remoteslot').
3261 : * The search is performed using either the replica identity index, primary
3262 : * key, other available index, or a sequential scan if necessary.
3263 : *
3264 : * Returns true if the deleted tuple is found. If found, the transaction ID,
3265 : * origin, and commit timestamp of the deletion are stored in '*delete_xid',
3266 : * '*delete_origin', and '*delete_time' respectively.
3267 : */
3268 : static bool
3269 18 : FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid,
3270 : TupleTableSlot *remoteslot,
3271 : TransactionId *delete_xid, RepOriginId *delete_origin,
3272 : TimestampTz *delete_time)
3273 : {
3274 : TransactionId oldestxmin;
3275 :
3276 : /*
3277 : * Return false if either dead tuples are not retained or commit timestamp
3278 : * data is not available.
3279 : */
3280 18 : if (!MySubscription->retaindeadtuples || !track_commit_timestamp)
3281 12 : return false;
3282 :
3283 : /*
3284 : * For conflict detection, we use the leader worker's
3285 : * oldest_nonremovable_xid value instead of invoking
3286 : * GetOldestNonRemovableTransactionId() or using the conflict detection
3287 : * slot's xmin. The oldest_nonremovable_xid acts as a threshold to
3288 : * identify tuples that were recently deleted. These deleted tuples are no
3289 : * longer visible to concurrent transactions. However, if a remote update
3290 : * matches such a tuple, we log an update_deleted conflict.
3291 : *
3292 : * While GetOldestNonRemovableTransactionId() and slot.xmin may return
3293 : * transaction IDs older than oldest_nonremovable_xid, for our current
3294 : * purpose, it is acceptable to treat tuples deleted by transactions prior
3295 : * to oldest_nonremovable_xid as update_missing conflicts.
3296 : */
3297 6 : if (am_leader_apply_worker())
3298 : {
3299 6 : oldestxmin = MyLogicalRepWorker->oldest_nonremovable_xid;
3300 : }
3301 : else
3302 : {
3303 : LogicalRepWorker *leader;
3304 :
3305 : /*
3306 : * Obtain the information from the leader apply worker as only the
3307 : * leader manages oldest_nonremovable_xid (see
3308 : * maybe_advance_nonremovable_xid() for details).
3309 : */
3310 0 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
3311 0 : leader = logicalrep_worker_find(WORKERTYPE_APPLY,
3312 0 : MyLogicalRepWorker->subid, InvalidOid,
3313 : false);
3314 0 : if (!leader)
3315 : {
3316 0 : ereport(ERROR,
3317 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3318 : errmsg("could not detect conflict as the leader apply worker has exited")));
3319 : }
3320 :
3321 0 : SpinLockAcquire(&leader->relmutex);
3322 0 : oldestxmin = leader->oldest_nonremovable_xid;
3323 0 : SpinLockRelease(&leader->relmutex);
3324 0 : LWLockRelease(LogicalRepWorkerLock);
3325 : }
3326 :
3327 : /*
3328 : * Return false if the leader apply worker has stopped retaining
3329 : * information for detecting conflicts. This implies that update_deleted
3330 : * can no longer be reliably detected.
3331 : */
3332 6 : if (!TransactionIdIsValid(oldestxmin))
3333 0 : return false;
3334 :
3335 8 : if (OidIsValid(localidxoid) &&
3336 2 : IsIndexUsableForFindingDeletedTuple(localidxoid, oldestxmin))
3337 2 : return RelationFindDeletedTupleInfoByIndex(localrel, localidxoid,
3338 : remoteslot, oldestxmin,
3339 : delete_xid, delete_origin,
3340 : delete_time);
3341 : else
3342 4 : return RelationFindDeletedTupleInfoSeq(localrel, remoteslot,
3343 : oldestxmin, delete_xid,
3344 : delete_origin, delete_time);
3345 : }
3346 :
3347 : /*
3348 : * This handles insert, update, delete on a partitioned table.
3349 : */
3350 : static void
3351 204 : apply_handle_tuple_routing(ApplyExecutionData *edata,
3352 : TupleTableSlot *remoteslot,
3353 : LogicalRepTupleData *newtup,
3354 : CmdType operation)
3355 : {
3356 204 : EState *estate = edata->estate;
3357 204 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
3358 204 : ResultRelInfo *relinfo = edata->targetRelInfo;
3359 204 : Relation parentrel = relinfo->ri_RelationDesc;
3360 : ModifyTableState *mtstate;
3361 : PartitionTupleRouting *proute;
3362 : ResultRelInfo *partrelinfo;
3363 : Relation partrel;
3364 : TupleTableSlot *remoteslot_part;
3365 : TupleConversionMap *map;
3366 : MemoryContext oldctx;
3367 204 : LogicalRepRelMapEntry *part_entry = NULL;
3368 204 : AttrMap *attrmap = NULL;
3369 :
3370 : /* ModifyTableState is needed for ExecFindPartition(). */
3371 204 : edata->mtstate = mtstate = makeNode(ModifyTableState);
3372 204 : mtstate->ps.plan = NULL;
3373 204 : mtstate->ps.state = estate;
3374 204 : mtstate->operation = operation;
3375 204 : mtstate->resultRelInfo = relinfo;
3376 :
3377 : /* ... as is PartitionTupleRouting. */
3378 204 : edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
3379 :
3380 : /*
3381 : * Find the partition to which the "search tuple" belongs.
3382 : */
3383 : Assert(remoteslot != NULL);
3384 204 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3385 204 : partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
3386 : remoteslot, estate);
3387 : Assert(partrelinfo != NULL);
3388 204 : partrel = partrelinfo->ri_RelationDesc;
3389 :
3390 : /*
3391 : * Check for supported relkind. We need this since partitions might be of
3392 : * unsupported relkinds; and the set of partitions can change, so checking
3393 : * at CREATE/ALTER SUBSCRIPTION would be insufficient.
3394 : */
3395 204 : CheckSubscriptionRelkind(partrel->rd_rel->relkind,
3396 204 : relmapentry->remoterel.relkind,
3397 204 : get_namespace_name(RelationGetNamespace(partrel)),
3398 204 : RelationGetRelationName(partrel));
3399 :
3400 : /*
3401 : * To perform any of the operations below, the tuple must match the
3402 : * partition's rowtype. Convert if needed or just copy, using a dedicated
3403 : * slot to store the tuple in any case.
3404 : */
3405 204 : remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
3406 204 : if (remoteslot_part == NULL)
3407 138 : remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
3408 204 : map = ExecGetRootToChildMap(partrelinfo, estate);
3409 204 : if (map != NULL)
3410 : {
3411 66 : attrmap = map->attrMap;
3412 66 : remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
3413 : remoteslot_part);
3414 : }
3415 : else
3416 : {
3417 138 : remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
3418 138 : slot_getallattrs(remoteslot_part);
3419 : }
3420 204 : MemoryContextSwitchTo(oldctx);
3421 :
3422 : /* Check if we can do the update or delete on the leaf partition. */
3423 204 : if (operation == CMD_UPDATE || operation == CMD_DELETE)
3424 : {
3425 60 : part_entry = logicalrep_partition_open(relmapentry, partrel,
3426 : attrmap);
3427 60 : check_relation_updatable(part_entry);
3428 : }
3429 :
3430 204 : switch (operation)
3431 : {
3432 144 : case CMD_INSERT:
3433 144 : apply_handle_insert_internal(edata, partrelinfo,
3434 : remoteslot_part);
3435 88 : break;
3436 :
3437 34 : case CMD_DELETE:
3438 34 : apply_handle_delete_internal(edata, partrelinfo,
3439 : remoteslot_part,
3440 : part_entry->localindexoid);
3441 34 : break;
3442 :
3443 26 : case CMD_UPDATE:
3444 :
3445 : /*
3446 : * For UPDATE, depending on whether or not the updated tuple
3447 : * satisfies the partition's constraint, perform a simple UPDATE
3448 : * of the partition or move the updated tuple into a different
3449 : * suitable partition.
3450 : */
3451 : {
3452 : TupleTableSlot *localslot;
3453 : ResultRelInfo *partrelinfo_new;
3454 : Relation partrel_new;
3455 : bool found;
3456 : EPQState epqstate;
3457 26 : ConflictTupleInfo conflicttuple = {0};
3458 :
3459 : /* Get the matching local tuple from the partition. */
3460 26 : found = FindReplTupleInLocalRel(edata, partrel,
3461 : &part_entry->remoterel,
3462 : part_entry->localindexoid,
3463 : remoteslot_part, &localslot);
3464 26 : if (!found)
3465 : {
3466 : ConflictType type;
3467 4 : TupleTableSlot *newslot = localslot;
3468 :
3469 : /*
3470 : * Detecting whether the tuple was recently deleted or
3471 : * never existed is crucial to avoid misleading the user
3472 : * during conflict handling.
3473 : */
3474 4 : if (FindDeletedTupleInLocalRel(partrel,
3475 : part_entry->localindexoid,
3476 : remoteslot_part,
3477 : &conflicttuple.xmin,
3478 : &conflicttuple.origin,
3479 0 : &conflicttuple.ts) &&
3480 0 : conflicttuple.origin != replorigin_session_origin)
3481 0 : type = CT_UPDATE_DELETED;
3482 : else
3483 4 : type = CT_UPDATE_MISSING;
3484 :
3485 : /* Store the new tuple for conflict reporting */
3486 4 : slot_store_data(newslot, part_entry, newtup);
3487 :
3488 : /*
3489 : * The tuple to be updated could not be found or was
3490 : * deleted. Do nothing except for emitting a log message.
3491 : */
3492 4 : ReportApplyConflict(estate, partrelinfo, LOG,
3493 : type, remoteslot_part, newslot,
3494 4 : list_make1(&conflicttuple));
3495 :
3496 4 : return;
3497 : }
3498 :
3499 : /*
3500 : * Report the conflict if the tuple was modified by a
3501 : * different origin.
3502 : */
3503 22 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
3504 : &conflicttuple.origin,
3505 2 : &conflicttuple.ts) &&
3506 2 : conflicttuple.origin != replorigin_session_origin)
3507 : {
3508 : TupleTableSlot *newslot;
3509 :
3510 : /* Store the new tuple for conflict reporting */
3511 2 : newslot = table_slot_create(partrel, &estate->es_tupleTable);
3512 2 : slot_store_data(newslot, part_entry, newtup);
3513 :
3514 2 : conflicttuple.slot = localslot;
3515 :
3516 2 : ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
3517 : remoteslot_part, newslot,
3518 2 : list_make1(&conflicttuple));
3519 : }
3520 :
3521 : /*
3522 : * Apply the update to the local tuple, putting the result in
3523 : * remoteslot_part.
3524 : */
3525 22 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3526 22 : slot_modify_data(remoteslot_part, localslot, part_entry,
3527 : newtup);
3528 22 : MemoryContextSwitchTo(oldctx);
3529 :
3530 22 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3531 :
3532 : /*
3533 : * Does the updated tuple still satisfy the current
3534 : * partition's constraint?
3535 : */
3536 44 : if (!partrel->rd_rel->relispartition ||
3537 22 : ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
3538 : false))
3539 : {
3540 : /*
3541 : * Yes, so simply UPDATE the partition. We don't call
3542 : * apply_handle_update_internal() here, which would
3543 : * normally do the following work, to avoid repeating some
3544 : * work already done above to find the local tuple in the
3545 : * partition.
3546 : */
3547 20 : InitConflictIndexes(partrelinfo);
3548 :
3549 20 : EvalPlanQualSetSlot(&epqstate, remoteslot_part);
3550 20 : TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
3551 : ACL_UPDATE);
3552 20 : ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3553 : localslot, remoteslot_part);
3554 : }
3555 : else
3556 : {
3557 : /* Move the tuple into the new partition. */
3558 :
3559 : /*
3560 : * New partition will be found using tuple routing, which
3561 : * can only occur via the parent table. We might need to
3562 : * convert the tuple to the parent's rowtype. Note that
3563 : * this is the tuple found in the partition, not the
3564 : * original search tuple received by this function.
3565 : */
3566 2 : if (map)
3567 : {
3568 : TupleConversionMap *PartitionToRootMap =
3569 2 : convert_tuples_by_name(RelationGetDescr(partrel),
3570 : RelationGetDescr(parentrel));
3571 :
3572 : remoteslot =
3573 2 : execute_attr_map_slot(PartitionToRootMap->attrMap,
3574 : remoteslot_part, remoteslot);
3575 : }
3576 : else
3577 : {
3578 0 : remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
3579 0 : slot_getallattrs(remoteslot);
3580 : }
3581 :
3582 : /* Find the new partition. */
3583 2 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3584 2 : partrelinfo_new = ExecFindPartition(mtstate, relinfo,
3585 : proute, remoteslot,
3586 : estate);
3587 2 : MemoryContextSwitchTo(oldctx);
3588 : Assert(partrelinfo_new != partrelinfo);
3589 2 : partrel_new = partrelinfo_new->ri_RelationDesc;
3590 :
3591 : /* Check that new partition also has supported relkind. */
3592 2 : CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3593 2 : relmapentry->remoterel.relkind,
3594 2 : get_namespace_name(RelationGetNamespace(partrel_new)),
3595 2 : RelationGetRelationName(partrel_new));
3596 :
3597 : /* DELETE old tuple found in the old partition. */
3598 2 : EvalPlanQualSetSlot(&epqstate, localslot);
3599 2 : TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, ACL_DELETE);
3600 2 : ExecSimpleRelationDelete(partrelinfo, estate, &epqstate, localslot);
3601 :
3602 : /* INSERT new tuple into the new partition. */
3603 :
3604 : /*
3605 : * Convert the replacement tuple to match the destination
3606 : * partition rowtype.
3607 : */
3608 2 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3609 2 : remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3610 2 : if (remoteslot_part == NULL)
3611 2 : remoteslot_part = table_slot_create(partrel_new,
3612 : &estate->es_tupleTable);
3613 2 : map = ExecGetRootToChildMap(partrelinfo_new, estate);
3614 2 : if (map != NULL)
3615 : {
3616 0 : remoteslot_part = execute_attr_map_slot(map->attrMap,
3617 : remoteslot,
3618 : remoteslot_part);
3619 : }
3620 : else
3621 : {
3622 2 : remoteslot_part = ExecCopySlot(remoteslot_part,
3623 : remoteslot);
3624 2 : slot_getallattrs(remoteslot);
3625 : }
3626 2 : MemoryContextSwitchTo(oldctx);
3627 2 : apply_handle_insert_internal(edata, partrelinfo_new,
3628 : remoteslot_part);
3629 : }
3630 :
3631 22 : EvalPlanQualEnd(&epqstate);
3632 : }
3633 22 : break;
3634 :
3635 0 : default:
3636 0 : elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3637 : break;
3638 : }
3639 : }
3640 :
3641 : /*
3642 : * Handle TRUNCATE message.
3643 : *
3644 : * TODO: FDW support
3645 : */
3646 : static void
3647 40 : apply_handle_truncate(StringInfo s)
3648 : {
3649 40 : bool cascade = false;
3650 40 : bool restart_seqs = false;
3651 40 : List *remote_relids = NIL;
3652 40 : List *remote_rels = NIL;
3653 40 : List *rels = NIL;
3654 40 : List *part_rels = NIL;
3655 40 : List *relids = NIL;
3656 40 : List *relids_logged = NIL;
3657 : ListCell *lc;
3658 40 : LOCKMODE lockmode = AccessExclusiveLock;
3659 :
3660 : /*
3661 : * Quick return if we are skipping data modification changes or handling
3662 : * streamed transactions.
3663 : */
3664 80 : if (is_skipping_changes() ||
3665 40 : handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
3666 0 : return;
3667 :
3668 40 : begin_replication_step();
3669 :
3670 40 : remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3671 :
3672 98 : foreach(lc, remote_relids)
3673 : {
3674 58 : LogicalRepRelId relid = lfirst_oid(lc);
3675 : LogicalRepRelMapEntry *rel;
3676 :
3677 58 : rel = logicalrep_rel_open(relid, lockmode);
3678 58 : if (!should_apply_changes_for_rel(rel))
3679 : {
3680 : /*
3681 : * The relation can't become interesting in the middle of the
3682 : * transaction so it's safe to unlock it.
3683 : */
3684 0 : logicalrep_rel_close(rel, lockmode);
3685 0 : continue;
3686 : }
3687 :
3688 58 : remote_rels = lappend(remote_rels, rel);
3689 58 : TargetPrivilegesCheck(rel->localrel, ACL_TRUNCATE);
3690 58 : rels = lappend(rels, rel->localrel);
3691 58 : relids = lappend_oid(relids, rel->localreloid);
3692 58 : if (RelationIsLogicallyLogged(rel->localrel))
3693 2 : relids_logged = lappend_oid(relids_logged, rel->localreloid);
3694 :
3695 : /*
3696 : * Truncate partitions if we got a message to truncate a partitioned
3697 : * table.
3698 : */
3699 58 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3700 : {
3701 : ListCell *child;
3702 8 : List *children = find_all_inheritors(rel->localreloid,
3703 : lockmode,
3704 : NULL);
3705 :
3706 30 : foreach(child, children)
3707 : {
3708 22 : Oid childrelid = lfirst_oid(child);
3709 : Relation childrel;
3710 :
3711 22 : if (list_member_oid(relids, childrelid))
3712 8 : continue;
3713 :
3714 : /* find_all_inheritors already got lock */
3715 14 : childrel = table_open(childrelid, NoLock);
3716 :
3717 : /*
3718 : * Ignore temp tables of other backends. See similar code in
3719 : * ExecuteTruncate().
3720 : */
3721 14 : if (RELATION_IS_OTHER_TEMP(childrel))
3722 : {
3723 0 : table_close(childrel, lockmode);
3724 0 : continue;
3725 : }
3726 :
3727 14 : TargetPrivilegesCheck(childrel, ACL_TRUNCATE);
3728 14 : rels = lappend(rels, childrel);
3729 14 : part_rels = lappend(part_rels, childrel);
3730 14 : relids = lappend_oid(relids, childrelid);
3731 : /* Log this relation only if needed for logical decoding */
3732 14 : if (RelationIsLogicallyLogged(childrel))
3733 0 : relids_logged = lappend_oid(relids_logged, childrelid);
3734 : }
3735 : }
3736 : }
3737 :
3738 : /*
3739 : * Even if we used CASCADE on the upstream primary we explicitly default
3740 : * to replaying changes without further cascading. This might be later
3741 : * changeable with a user specified option.
3742 : *
3743 : * MySubscription->runasowner tells us whether we want to execute
3744 : * replication actions as the subscription owner; the last argument to
3745 : * TruncateGuts tells it whether we want to switch to the table owner.
3746 : * Those are exactly opposite conditions.
3747 : */
3748 40 : ExecuteTruncateGuts(rels,
3749 : relids,
3750 : relids_logged,
3751 : DROP_RESTRICT,
3752 : restart_seqs,
3753 40 : !MySubscription->runasowner);
3754 98 : foreach(lc, remote_rels)
3755 : {
3756 58 : LogicalRepRelMapEntry *rel = lfirst(lc);
3757 :
3758 58 : logicalrep_rel_close(rel, NoLock);
3759 : }
3760 54 : foreach(lc, part_rels)
3761 : {
3762 14 : Relation rel = lfirst(lc);
3763 :
3764 14 : table_close(rel, NoLock);
3765 : }
3766 :
3767 40 : end_replication_step();
3768 : }
3769 :
3770 :
3771 : /*
3772 : * Logical replication protocol message dispatcher.
3773 : */
3774 : void
3775 674766 : apply_dispatch(StringInfo s)
3776 : {
3777 674766 : LogicalRepMsgType action = pq_getmsgbyte(s);
3778 : LogicalRepMsgType saved_command;
3779 :
3780 : /*
3781 : * Set the current command being applied. Since this function can be
3782 : * called recursively when applying spooled changes, save the current
3783 : * command.
3784 : */
3785 674766 : saved_command = apply_error_callback_arg.command;
3786 674766 : apply_error_callback_arg.command = action;
3787 :
3788 674766 : switch (action)
3789 : {
3790 998 : case LOGICAL_REP_MSG_BEGIN:
3791 998 : apply_handle_begin(s);
3792 998 : break;
3793 :
3794 870 : case LOGICAL_REP_MSG_COMMIT:
3795 870 : apply_handle_commit(s);
3796 870 : break;
3797 :
3798 371962 : case LOGICAL_REP_MSG_INSERT:
3799 371962 : apply_handle_insert(s);
3800 371852 : break;
3801 :
3802 132330 : case LOGICAL_REP_MSG_UPDATE:
3803 132330 : apply_handle_update(s);
3804 132308 : break;
3805 :
3806 163872 : case LOGICAL_REP_MSG_DELETE:
3807 163872 : apply_handle_delete(s);
3808 163872 : break;
3809 :
3810 40 : case LOGICAL_REP_MSG_TRUNCATE:
3811 40 : apply_handle_truncate(s);
3812 40 : break;
3813 :
3814 962 : case LOGICAL_REP_MSG_RELATION:
3815 962 : apply_handle_relation(s);
3816 962 : break;
3817 :
3818 36 : case LOGICAL_REP_MSG_TYPE:
3819 36 : apply_handle_type(s);
3820 36 : break;
3821 :
3822 14 : case LOGICAL_REP_MSG_ORIGIN:
3823 14 : apply_handle_origin(s);
3824 14 : break;
3825 :
3826 0 : case LOGICAL_REP_MSG_MESSAGE:
3827 :
3828 : /*
3829 : * Logical replication does not use generic logical messages yet.
3830 : * Although, it could be used by other applications that use this
3831 : * output plugin.
3832 : */
3833 0 : break;
3834 :
3835 1676 : case LOGICAL_REP_MSG_STREAM_START:
3836 1676 : apply_handle_stream_start(s);
3837 1676 : break;
3838 :
3839 1674 : case LOGICAL_REP_MSG_STREAM_STOP:
3840 1674 : apply_handle_stream_stop(s);
3841 1670 : break;
3842 :
3843 76 : case LOGICAL_REP_MSG_STREAM_ABORT:
3844 76 : apply_handle_stream_abort(s);
3845 76 : break;
3846 :
3847 122 : case LOGICAL_REP_MSG_STREAM_COMMIT:
3848 122 : apply_handle_stream_commit(s);
3849 118 : break;
3850 :
3851 32 : case LOGICAL_REP_MSG_BEGIN_PREPARE:
3852 32 : apply_handle_begin_prepare(s);
3853 32 : break;
3854 :
3855 30 : case LOGICAL_REP_MSG_PREPARE:
3856 30 : apply_handle_prepare(s);
3857 28 : break;
3858 :
3859 40 : case LOGICAL_REP_MSG_COMMIT_PREPARED:
3860 40 : apply_handle_commit_prepared(s);
3861 40 : break;
3862 :
3863 10 : case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
3864 10 : apply_handle_rollback_prepared(s);
3865 10 : break;
3866 :
3867 22 : case LOGICAL_REP_MSG_STREAM_PREPARE:
3868 22 : apply_handle_stream_prepare(s);
3869 22 : break;
3870 :
3871 0 : default:
3872 0 : ereport(ERROR,
3873 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
3874 : errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3875 : }
3876 :
3877 : /* Reset the current command */
3878 674624 : apply_error_callback_arg.command = saved_command;
3879 674624 : }
3880 :
3881 : /*
3882 : * Figure out which write/flush positions to report to the walsender process.
3883 : *
3884 : * We can't simply report back the last LSN the walsender sent us because the
3885 : * local transaction might not yet be flushed to disk locally. Instead we
3886 : * build a list that associates local with remote LSNs for every commit. When
3887 : * reporting back the flush position to the sender we iterate that list and
3888 : * check which entries on it are already locally flushed. Those we can report
3889 : * as having been flushed.
3890 : *
3891 : * The have_pending_txes is true if there are outstanding transactions that
3892 : * need to be flushed.
3893 : */
3894 : static void
3895 149360 : get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
3896 : bool *have_pending_txes)
3897 : {
3898 : dlist_mutable_iter iter;
3899 149360 : XLogRecPtr local_flush = GetFlushRecPtr(NULL);
3900 :
3901 149360 : *write = InvalidXLogRecPtr;
3902 149360 : *flush = InvalidXLogRecPtr;
3903 :
3904 150388 : dlist_foreach_modify(iter, &lsn_mapping)
3905 : {
3906 18662 : FlushPosition *pos =
3907 18662 : dlist_container(FlushPosition, node, iter.cur);
3908 :
3909 18662 : *write = pos->remote_end;
3910 :
3911 18662 : if (pos->local_end <= local_flush)
3912 : {
3913 1028 : *flush = pos->remote_end;
3914 1028 : dlist_delete(iter.cur);
3915 1028 : pfree(pos);
3916 : }
3917 : else
3918 : {
3919 : /*
3920 : * Don't want to uselessly iterate over the rest of the list which
3921 : * could potentially be long. Instead get the last element and
3922 : * grab the write position from there.
3923 : */
3924 17634 : pos = dlist_tail_element(FlushPosition, node,
3925 : &lsn_mapping);
3926 17634 : *write = pos->remote_end;
3927 17634 : *have_pending_txes = true;
3928 17634 : return;
3929 : }
3930 : }
3931 :
3932 131726 : *have_pending_txes = !dlist_is_empty(&lsn_mapping);
3933 : }
3934 :
3935 : /*
3936 : * Store current remote/local lsn pair in the tracking list.
3937 : */
3938 : void
3939 1082 : store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
3940 : {
3941 : FlushPosition *flushpos;
3942 :
3943 : /*
3944 : * Skip for parallel apply workers, because the lsn_mapping is maintained
3945 : * by the leader apply worker.
3946 : */
3947 1082 : if (am_parallel_apply_worker())
3948 38 : return;
3949 :
3950 : /* Need to do this in permanent context */
3951 1044 : MemoryContextSwitchTo(ApplyContext);
3952 :
3953 : /* Track commit lsn */
3954 1044 : flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
3955 1044 : flushpos->local_end = local_lsn;
3956 1044 : flushpos->remote_end = remote_lsn;
3957 :
3958 1044 : dlist_push_tail(&lsn_mapping, &flushpos->node);
3959 1044 : MemoryContextSwitchTo(ApplyMessageContext);
3960 : }
3961 :
3962 :
3963 : /* Update statistics of the worker. */
3964 : static void
3965 393162 : UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
3966 : {
3967 393162 : MyLogicalRepWorker->last_lsn = last_lsn;
3968 393162 : MyLogicalRepWorker->last_send_time = send_time;
3969 393162 : MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
3970 393162 : if (reply)
3971 : {
3972 3318 : MyLogicalRepWorker->reply_lsn = last_lsn;
3973 3318 : MyLogicalRepWorker->reply_time = send_time;
3974 : }
3975 393162 : }
3976 :
3977 : /*
3978 : * Apply main loop.
3979 : */
3980 : static void
3981 844 : LogicalRepApplyLoop(XLogRecPtr last_received)
3982 : {
3983 844 : TimestampTz last_recv_timestamp = GetCurrentTimestamp();
3984 844 : bool ping_sent = false;
3985 : TimeLineID tli;
3986 : ErrorContextCallback errcallback;
3987 844 : RetainDeadTuplesData rdt_data = {0};
3988 :
3989 : /*
3990 : * Init the ApplyMessageContext which we clean up after each replication
3991 : * protocol message.
3992 : */
3993 844 : ApplyMessageContext = AllocSetContextCreate(ApplyContext,
3994 : "ApplyMessageContext",
3995 : ALLOCSET_DEFAULT_SIZES);
3996 :
3997 : /*
3998 : * This memory context is used for per-stream data when the streaming mode
3999 : * is enabled. This context is reset on each stream stop.
4000 : */
4001 844 : LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
4002 : "LogicalStreamingContext",
4003 : ALLOCSET_DEFAULT_SIZES);
4004 :
4005 : /* mark as idle, before starting to loop */
4006 844 : pgstat_report_activity(STATE_IDLE, NULL);
4007 :
4008 : /*
4009 : * Push apply error context callback. Fields will be filled while applying
4010 : * a change.
4011 : */
4012 844 : errcallback.callback = apply_error_callback;
4013 844 : errcallback.previous = error_context_stack;
4014 844 : error_context_stack = &errcallback;
4015 844 : apply_error_context_stack = error_context_stack;
4016 :
4017 : /* This outer loop iterates once per wait. */
4018 : for (;;)
4019 144838 : {
4020 145682 : pgsocket fd = PGINVALID_SOCKET;
4021 : int rc;
4022 : int len;
4023 145682 : char *buf = NULL;
4024 145682 : bool endofstream = false;
4025 : long wait_time;
4026 :
4027 145682 : CHECK_FOR_INTERRUPTS();
4028 :
4029 145682 : MemoryContextSwitchTo(ApplyMessageContext);
4030 :
4031 145682 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
4032 :
4033 145646 : if (len != 0)
4034 : {
4035 : /* Loop to process all available data (without blocking). */
4036 : for (;;)
4037 : {
4038 536378 : CHECK_FOR_INTERRUPTS();
4039 :
4040 536378 : if (len == 0)
4041 : {
4042 143196 : break;
4043 : }
4044 393182 : else if (len < 0)
4045 : {
4046 20 : ereport(LOG,
4047 : (errmsg("data stream from publisher has ended")));
4048 20 : endofstream = true;
4049 20 : break;
4050 : }
4051 : else
4052 : {
4053 : int c;
4054 : StringInfoData s;
4055 :
4056 393162 : if (ConfigReloadPending)
4057 : {
4058 0 : ConfigReloadPending = false;
4059 0 : ProcessConfigFile(PGC_SIGHUP);
4060 : }
4061 :
4062 : /* Reset timeout. */
4063 393162 : last_recv_timestamp = GetCurrentTimestamp();
4064 393162 : ping_sent = false;
4065 :
4066 393162 : rdt_data.last_recv_time = last_recv_timestamp;
4067 :
4068 : /* Ensure we are reading the data into our memory context. */
4069 393162 : MemoryContextSwitchTo(ApplyMessageContext);
4070 :
4071 393162 : initReadOnlyStringInfo(&s, buf, len);
4072 :
4073 393162 : c = pq_getmsgbyte(&s);
4074 :
4075 393162 : if (c == PqReplMsg_WALData)
4076 : {
4077 : XLogRecPtr start_lsn;
4078 : XLogRecPtr end_lsn;
4079 : TimestampTz send_time;
4080 :
4081 369846 : start_lsn = pq_getmsgint64(&s);
4082 369846 : end_lsn = pq_getmsgint64(&s);
4083 369846 : send_time = pq_getmsgint64(&s);
4084 :
4085 369846 : if (last_received < start_lsn)
4086 295662 : last_received = start_lsn;
4087 :
4088 369846 : if (last_received < end_lsn)
4089 0 : last_received = end_lsn;
4090 :
4091 369846 : UpdateWorkerStats(last_received, send_time, false);
4092 :
4093 369846 : apply_dispatch(&s);
4094 :
4095 369710 : maybe_advance_nonremovable_xid(&rdt_data, false);
4096 : }
4097 23316 : else if (c == PqReplMsg_Keepalive)
4098 : {
4099 : XLogRecPtr end_lsn;
4100 : TimestampTz timestamp;
4101 : bool reply_requested;
4102 :
4103 3318 : end_lsn = pq_getmsgint64(&s);
4104 3318 : timestamp = pq_getmsgint64(&s);
4105 3318 : reply_requested = pq_getmsgbyte(&s);
4106 :
4107 3318 : if (last_received < end_lsn)
4108 1982 : last_received = end_lsn;
4109 :
4110 3318 : send_feedback(last_received, reply_requested, false);
4111 :
4112 3318 : maybe_advance_nonremovable_xid(&rdt_data, false);
4113 :
4114 3318 : UpdateWorkerStats(last_received, timestamp, true);
4115 : }
4116 19998 : else if (c == PqReplMsg_PrimaryStatusUpdate)
4117 : {
4118 19998 : rdt_data.remote_lsn = pq_getmsgint64(&s);
4119 19998 : rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
4120 19998 : rdt_data.remote_nextxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
4121 19998 : rdt_data.reply_time = pq_getmsgint64(&s);
4122 :
4123 : /*
4124 : * This should never happen, see
4125 : * ProcessStandbyPSRequestMessage. But if it happens
4126 : * due to a bug, we don't want to proceed as it can
4127 : * incorrectly advance oldest_nonremovable_xid.
4128 : */
4129 19998 : if (!XLogRecPtrIsValid(rdt_data.remote_lsn))
4130 0 : elog(ERROR, "cannot get the latest WAL position from the publisher");
4131 :
4132 19998 : maybe_advance_nonremovable_xid(&rdt_data, true);
4133 :
4134 19998 : UpdateWorkerStats(last_received, rdt_data.reply_time, false);
4135 : }
4136 : /* other message types are purposefully ignored */
4137 :
4138 393026 : MemoryContextReset(ApplyMessageContext);
4139 : }
4140 :
4141 393026 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
4142 : }
4143 : }
4144 :
4145 : /* confirm all writes so far */
4146 145510 : send_feedback(last_received, false, false);
4147 :
4148 : /* Reset the timestamp if no message was received */
4149 145510 : rdt_data.last_recv_time = 0;
4150 :
4151 145510 : maybe_advance_nonremovable_xid(&rdt_data, false);
4152 :
4153 145508 : if (!in_remote_transaction && !in_streamed_transaction)
4154 : {
4155 : /*
4156 : * If we didn't get any transactions for a while there might be
4157 : * unconsumed invalidation messages in the queue, consume them
4158 : * now.
4159 : */
4160 13878 : AcceptInvalidationMessages();
4161 13878 : maybe_reread_subscription();
4162 :
4163 : /*
4164 : * Process any relations that are being synchronized in parallel
4165 : * and any newly added tables or sequences.
4166 : */
4167 13796 : ProcessSyncingRelations(last_received);
4168 : }
4169 :
4170 : /* Cleanup the memory. */
4171 145044 : MemoryContextReset(ApplyMessageContext);
4172 145044 : MemoryContextSwitchTo(TopMemoryContext);
4173 :
4174 : /* Check if we need to exit the streaming loop. */
4175 145044 : if (endofstream)
4176 20 : break;
4177 :
4178 : /*
4179 : * Wait for more data or latch. If we have unflushed transactions,
4180 : * wake up after WalWriterDelay to see if they've been flushed yet (in
4181 : * which case we should send a feedback message). Otherwise, there's
4182 : * no particular urgency about waking up unless we get data or a
4183 : * signal.
4184 : */
4185 145024 : if (!dlist_is_empty(&lsn_mapping))
4186 16660 : wait_time = WalWriterDelay;
4187 : else
4188 128364 : wait_time = NAPTIME_PER_CYCLE;
4189 :
4190 : /*
4191 : * Ensure to wake up when it's possible to advance the non-removable
4192 : * transaction ID, or when the retention duration may have exceeded
4193 : * max_retention_duration.
4194 : */
4195 145024 : if (MySubscription->retentionactive)
4196 : {
4197 8600 : if (rdt_data.phase == RDT_GET_CANDIDATE_XID &&
4198 142 : rdt_data.xid_advance_interval)
4199 0 : wait_time = Min(wait_time, rdt_data.xid_advance_interval);
4200 8600 : else if (MySubscription->maxretention > 0)
4201 2 : wait_time = Min(wait_time, MySubscription->maxretention);
4202 : }
4203 :
4204 145024 : rc = WaitLatchOrSocket(MyLatch,
4205 : WL_SOCKET_READABLE | WL_LATCH_SET |
4206 : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
4207 : fd, wait_time,
4208 : WAIT_EVENT_LOGICAL_APPLY_MAIN);
4209 :
4210 145024 : if (rc & WL_LATCH_SET)
4211 : {
4212 1412 : ResetLatch(MyLatch);
4213 1412 : CHECK_FOR_INTERRUPTS();
4214 : }
4215 :
4216 144838 : if (ConfigReloadPending)
4217 : {
4218 20 : ConfigReloadPending = false;
4219 20 : ProcessConfigFile(PGC_SIGHUP);
4220 : }
4221 :
4222 144838 : if (rc & WL_TIMEOUT)
4223 : {
4224 : /*
4225 : * We didn't receive anything new. If we haven't heard anything
4226 : * from the server for more than wal_receiver_timeout / 2, ping
4227 : * the server. Also, if it's been longer than
4228 : * wal_receiver_status_interval since the last update we sent,
4229 : * send a status update to the primary anyway, to report any
4230 : * progress in applying WAL.
4231 : */
4232 502 : bool requestReply = false;
4233 :
4234 : /*
4235 : * Check if time since last receive from primary has reached the
4236 : * configured limit.
4237 : */
4238 502 : if (wal_receiver_timeout > 0)
4239 : {
4240 502 : TimestampTz now = GetCurrentTimestamp();
4241 : TimestampTz timeout;
4242 :
4243 502 : timeout =
4244 502 : TimestampTzPlusMilliseconds(last_recv_timestamp,
4245 : wal_receiver_timeout);
4246 :
4247 502 : if (now >= timeout)
4248 0 : ereport(ERROR,
4249 : (errcode(ERRCODE_CONNECTION_FAILURE),
4250 : errmsg("terminating logical replication worker due to timeout")));
4251 :
4252 : /* Check to see if it's time for a ping. */
4253 502 : if (!ping_sent)
4254 : {
4255 502 : timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
4256 : (wal_receiver_timeout / 2));
4257 502 : if (now >= timeout)
4258 : {
4259 0 : requestReply = true;
4260 0 : ping_sent = true;
4261 : }
4262 : }
4263 : }
4264 :
4265 502 : send_feedback(last_received, requestReply, requestReply);
4266 :
4267 502 : maybe_advance_nonremovable_xid(&rdt_data, false);
4268 :
4269 : /*
4270 : * Force reporting to ensure long idle periods don't lead to
4271 : * arbitrarily delayed stats. Stats can only be reported outside
4272 : * of (implicit or explicit) transactions. That shouldn't lead to
4273 : * stats being delayed for long, because transactions are either
4274 : * sent as a whole on commit or streamed. Streamed transactions
4275 : * are spilled to disk and applied on commit.
4276 : */
4277 502 : if (!IsTransactionState())
4278 502 : pgstat_report_stat(true);
4279 : }
4280 : }
4281 :
4282 : /* Pop the error context stack */
4283 20 : error_context_stack = errcallback.previous;
4284 20 : apply_error_context_stack = error_context_stack;
4285 :
4286 : /* All done */
4287 20 : walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
4288 0 : }
4289 :
4290 : /*
4291 : * Send a Standby Status Update message to server.
4292 : *
4293 : * 'recvpos' is the latest LSN we've received data to, force is set if we need
4294 : * to send a response to avoid timeouts.
4295 : */
4296 : static void
4297 149330 : send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
4298 : {
4299 : static StringInfo reply_message = NULL;
4300 : static TimestampTz send_time = 0;
4301 :
4302 : static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
4303 : static XLogRecPtr last_writepos = InvalidXLogRecPtr;
4304 :
4305 : XLogRecPtr writepos;
4306 : XLogRecPtr flushpos;
4307 : TimestampTz now;
4308 : bool have_pending_txes;
4309 :
4310 : /*
4311 : * If the user doesn't want status to be reported to the publisher, be
4312 : * sure to exit before doing anything at all.
4313 : */
4314 149330 : if (!force && wal_receiver_status_interval <= 0)
4315 47368 : return;
4316 :
4317 : /* It's legal to not pass a recvpos */
4318 149330 : if (recvpos < last_recvpos)
4319 0 : recvpos = last_recvpos;
4320 :
4321 149330 : get_flush_position(&writepos, &flushpos, &have_pending_txes);
4322 :
4323 : /*
4324 : * No outstanding transactions to flush, we can report the latest received
4325 : * position. This is important for synchronous replication.
4326 : */
4327 149330 : if (!have_pending_txes)
4328 131710 : flushpos = writepos = recvpos;
4329 :
4330 149330 : if (writepos < last_writepos)
4331 0 : writepos = last_writepos;
4332 :
4333 149330 : if (flushpos < last_flushpos)
4334 17514 : flushpos = last_flushpos;
4335 :
4336 149330 : now = GetCurrentTimestamp();
4337 :
4338 : /* if we've already reported everything we're good */
4339 149330 : if (!force &&
4340 149324 : writepos == last_writepos &&
4341 47998 : flushpos == last_flushpos &&
4342 47622 : !TimestampDifferenceExceeds(send_time, now,
4343 : wal_receiver_status_interval * 1000))
4344 47368 : return;
4345 101962 : send_time = now;
4346 :
4347 101962 : if (!reply_message)
4348 : {
4349 844 : MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
4350 :
4351 844 : reply_message = makeStringInfo();
4352 844 : MemoryContextSwitchTo(oldctx);
4353 : }
4354 : else
4355 101118 : resetStringInfo(reply_message);
4356 :
4357 101962 : pq_sendbyte(reply_message, PqReplMsg_StandbyStatusUpdate);
4358 101962 : pq_sendint64(reply_message, recvpos); /* write */
4359 101962 : pq_sendint64(reply_message, flushpos); /* flush */
4360 101962 : pq_sendint64(reply_message, writepos); /* apply */
4361 101962 : pq_sendint64(reply_message, now); /* sendTime */
4362 101962 : pq_sendbyte(reply_message, requestReply); /* replyRequested */
4363 :
4364 101962 : elog(DEBUG2, "sending feedback (force %d) to recv %X/%08X, write %X/%08X, flush %X/%08X",
4365 : force,
4366 : LSN_FORMAT_ARGS(recvpos),
4367 : LSN_FORMAT_ARGS(writepos),
4368 : LSN_FORMAT_ARGS(flushpos));
4369 :
4370 101962 : walrcv_send(LogRepWorkerWalRcvConn,
4371 : reply_message->data, reply_message->len);
4372 :
4373 101962 : if (recvpos > last_recvpos)
4374 101332 : last_recvpos = recvpos;
4375 101962 : if (writepos > last_writepos)
4376 101332 : last_writepos = writepos;
4377 101962 : if (flushpos > last_flushpos)
4378 100948 : last_flushpos = flushpos;
4379 : }
4380 :
4381 : /*
4382 : * Attempt to advance the non-removable transaction ID.
4383 : *
4384 : * See comments atop worker.c for details.
4385 : */
4386 : static void
4387 539038 : maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
4388 : bool status_received)
4389 : {
4390 539038 : if (!can_advance_nonremovable_xid(rdt_data))
4391 510048 : return;
4392 :
4393 28990 : process_rdt_phase_transition(rdt_data, status_received);
4394 : }
4395 :
4396 : /*
4397 : * Preliminary check to determine if advancing the non-removable transaction ID
4398 : * is allowed.
4399 : */
4400 : static bool
4401 539038 : can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
4402 : {
4403 : /*
4404 : * It is sufficient to manage non-removable transaction ID for a
4405 : * subscription by the main apply worker to detect update_deleted reliably
4406 : * even for table sync or parallel apply workers.
4407 : */
4408 539038 : if (!am_leader_apply_worker())
4409 500 : return false;
4410 :
4411 : /* No need to advance if retaining dead tuples is not required */
4412 538538 : if (!MySubscription->retaindeadtuples)
4413 509548 : return false;
4414 :
4415 28990 : return true;
4416 : }
4417 :
4418 : /*
4419 : * Process phase transitions during the non-removable transaction ID
4420 : * advancement. See comments atop worker.c for details of the transition.
4421 : */
4422 : static void
4423 49158 : process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
4424 : bool status_received)
4425 : {
4426 49158 : switch (rdt_data->phase)
4427 : {
4428 400 : case RDT_GET_CANDIDATE_XID:
4429 400 : get_candidate_xid(rdt_data);
4430 400 : break;
4431 20010 : case RDT_REQUEST_PUBLISHER_STATUS:
4432 20010 : request_publisher_status(rdt_data);
4433 20010 : break;
4434 28492 : case RDT_WAIT_FOR_PUBLISHER_STATUS:
4435 28492 : wait_for_publisher_status(rdt_data, status_received);
4436 28492 : break;
4437 252 : case RDT_WAIT_FOR_LOCAL_FLUSH:
4438 252 : wait_for_local_flush(rdt_data);
4439 252 : break;
4440 2 : case RDT_STOP_CONFLICT_INFO_RETENTION:
4441 2 : stop_conflict_info_retention(rdt_data);
4442 2 : break;
4443 2 : case RDT_RESUME_CONFLICT_INFO_RETENTION:
4444 2 : resume_conflict_info_retention(rdt_data);
4445 0 : break;
4446 : }
4447 49156 : }
4448 :
4449 : /*
4450 : * Workhorse for the RDT_GET_CANDIDATE_XID phase.
4451 : */
4452 : static void
4453 400 : get_candidate_xid(RetainDeadTuplesData *rdt_data)
4454 : {
4455 : TransactionId oldest_running_xid;
4456 : TimestampTz now;
4457 :
4458 : /*
4459 : * Use last_recv_time when applying changes in the loop to avoid
4460 : * unnecessary system time retrieval. If last_recv_time is not available,
4461 : * obtain the current timestamp.
4462 : */
4463 400 : now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4464 :
4465 : /*
4466 : * Compute the candidate_xid and request the publisher status at most once
4467 : * per xid_advance_interval. Refer to adjust_xid_advance_interval() for
4468 : * details on how this value is dynamically adjusted. This is to avoid
4469 : * using CPU and network resources without making much progress.
4470 : */
4471 400 : if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
4472 : rdt_data->xid_advance_interval))
4473 0 : return;
4474 :
4475 : /*
4476 : * Immediately update the timer, even if the function returns later
4477 : * without setting candidate_xid due to inactivity on the subscriber. This
4478 : * avoids frequent calls to GetOldestActiveTransactionId.
4479 : */
4480 400 : rdt_data->candidate_xid_time = now;
4481 :
4482 : /*
4483 : * Consider transactions in the current database, as only dead tuples from
4484 : * this database are required for conflict detection.
4485 : */
4486 400 : oldest_running_xid = GetOldestActiveTransactionId(false, false);
4487 :
4488 : /*
4489 : * Oldest active transaction ID (oldest_running_xid) can't be behind any
4490 : * of its previously computed value.
4491 : */
4492 : Assert(TransactionIdPrecedesOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
4493 : oldest_running_xid));
4494 :
4495 : /* Return if the oldest_nonremovable_xid cannot be advanced */
4496 400 : if (TransactionIdEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
4497 : oldest_running_xid))
4498 : {
4499 306 : adjust_xid_advance_interval(rdt_data, false);
4500 306 : return;
4501 : }
4502 :
4503 94 : adjust_xid_advance_interval(rdt_data, true);
4504 :
4505 94 : rdt_data->candidate_xid = oldest_running_xid;
4506 94 : rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
4507 :
4508 : /* process the next phase */
4509 94 : process_rdt_phase_transition(rdt_data, false);
4510 : }
4511 :
4512 : /*
4513 : * Workhorse for the RDT_REQUEST_PUBLISHER_STATUS phase.
4514 : */
4515 : static void
4516 20010 : request_publisher_status(RetainDeadTuplesData *rdt_data)
4517 : {
4518 : static StringInfo request_message = NULL;
4519 :
4520 20010 : if (!request_message)
4521 : {
4522 24 : MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
4523 :
4524 24 : request_message = makeStringInfo();
4525 24 : MemoryContextSwitchTo(oldctx);
4526 : }
4527 : else
4528 19986 : resetStringInfo(request_message);
4529 :
4530 : /*
4531 : * Send the current time to update the remote walsender's latest reply
4532 : * message received time.
4533 : */
4534 20010 : pq_sendbyte(request_message, PqReplMsg_PrimaryStatusRequest);
4535 20010 : pq_sendint64(request_message, GetCurrentTimestamp());
4536 :
4537 20010 : elog(DEBUG2, "sending publisher status request message");
4538 :
4539 : /* Send a request for the publisher status */
4540 20010 : walrcv_send(LogRepWorkerWalRcvConn,
4541 : request_message->data, request_message->len);
4542 :
4543 20010 : rdt_data->phase = RDT_WAIT_FOR_PUBLISHER_STATUS;
4544 :
4545 : /*
4546 : * Skip calling maybe_advance_nonremovable_xid() since further transition
4547 : * is possible only once we receive the publisher status message.
4548 : */
4549 20010 : }
4550 :
4551 : /*
4552 : * Workhorse for the RDT_WAIT_FOR_PUBLISHER_STATUS phase.
4553 : */
4554 : static void
4555 28492 : wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
4556 : bool status_received)
4557 : {
4558 : /*
4559 : * Return if we have requested but not yet received the publisher status.
4560 : */
4561 28492 : if (!status_received)
4562 8494 : return;
4563 :
4564 : /*
4565 : * We don't need to maintain oldest_nonremovable_xid if we decide to stop
4566 : * retaining conflict information for this worker.
4567 : */
4568 19998 : if (should_stop_conflict_info_retention(rdt_data))
4569 : {
4570 0 : rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
4571 0 : return;
4572 : }
4573 :
4574 19998 : if (!FullTransactionIdIsValid(rdt_data->remote_wait_for))
4575 82 : rdt_data->remote_wait_for = rdt_data->remote_nextxid;
4576 :
4577 : /*
4578 : * Check if all remote concurrent transactions that were active at the
4579 : * first status request have now completed. If completed, proceed to the
4580 : * next phase; otherwise, continue checking the publisher status until
4581 : * these transactions finish.
4582 : *
4583 : * It's possible that transactions in the commit phase during the last
4584 : * cycle have now finished committing, but remote_oldestxid remains older
4585 : * than remote_wait_for. This can happen if some old transaction came in
4586 : * the commit phase when we requested status in this cycle. We do not
4587 : * handle this case explicitly as it's rare and the benefit doesn't
4588 : * justify the required complexity. Tracking would require either caching
4589 : * all xids at the publisher or sending them to subscribers. The condition
4590 : * will resolve naturally once the remaining transactions are finished.
4591 : *
4592 : * Directly advancing the non-removable transaction ID is possible if
4593 : * there are no activities on the publisher since the last advancement
4594 : * cycle. However, it requires maintaining two fields, last_remote_nextxid
4595 : * and last_remote_lsn, within the structure for comparison with the
4596 : * current cycle's values. Considering the minimal cost of continuing in
4597 : * RDT_WAIT_FOR_LOCAL_FLUSH without awaiting changes, we opted not to
4598 : * advance the transaction ID here.
4599 : */
4600 19998 : if (FullTransactionIdPrecedesOrEquals(rdt_data->remote_wait_for,
4601 : rdt_data->remote_oldestxid))
4602 82 : rdt_data->phase = RDT_WAIT_FOR_LOCAL_FLUSH;
4603 : else
4604 19916 : rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
4605 :
4606 : /* process the next phase */
4607 19998 : process_rdt_phase_transition(rdt_data, false);
4608 : }
4609 :
4610 : /*
4611 : * Workhorse for the RDT_WAIT_FOR_LOCAL_FLUSH phase.
4612 : */
4613 : static void
4614 252 : wait_for_local_flush(RetainDeadTuplesData *rdt_data)
4615 : {
4616 : Assert(XLogRecPtrIsValid(rdt_data->remote_lsn) &&
4617 : TransactionIdIsValid(rdt_data->candidate_xid));
4618 :
4619 : /*
4620 : * We expect the publisher and subscriber clocks to be in sync using time
4621 : * sync service like NTP. Otherwise, we will advance this worker's
4622 : * oldest_nonremovable_xid prematurely, leading to the removal of rows
4623 : * required to detect update_deleted reliably. This check primarily
4624 : * addresses scenarios where the publisher's clock falls behind; if the
4625 : * publisher's clock is ahead, subsequent transactions will naturally bear
4626 : * later commit timestamps, conforming to the design outlined atop
4627 : * worker.c.
4628 : *
4629 : * XXX Consider waiting for the publisher's clock to catch up with the
4630 : * subscriber's before proceeding to the next phase.
4631 : */
4632 252 : if (TimestampDifferenceExceeds(rdt_data->reply_time,
4633 : rdt_data->candidate_xid_time, 0))
4634 0 : ereport(ERROR,
4635 : errmsg_internal("oldest_nonremovable_xid transaction ID could be advanced prematurely"),
4636 : errdetail_internal("The clock on the publisher is behind that of the subscriber."));
4637 :
4638 : /*
4639 : * Do not attempt to advance the non-removable transaction ID when table
4640 : * sync is in progress. During this time, changes from a single
4641 : * transaction may be applied by multiple table sync workers corresponding
4642 : * to the target tables. So, it's necessary for all table sync workers to
4643 : * apply and flush the corresponding changes before advancing the
4644 : * transaction ID, otherwise, dead tuples that are still needed for
4645 : * conflict detection in table sync workers could be removed prematurely.
4646 : * However, confirming the apply and flush progress across all table sync
4647 : * workers is complex and not worth the effort, so we simply return if not
4648 : * all tables are in the READY state.
4649 : *
4650 : * Advancing the transaction ID is necessary even when no tables are
4651 : * currently subscribed, to avoid retaining dead tuples unnecessarily.
4652 : * While it might seem safe to skip all phases and directly assign
4653 : * candidate_xid to oldest_nonremovable_xid during the
4654 : * RDT_GET_CANDIDATE_XID phase in such cases, this is unsafe. If users
4655 : * concurrently add tables to the subscription, the apply worker may not
4656 : * process invalidations in time. Consequently,
4657 : * HasSubscriptionTablesCached() might miss the new tables, leading to
4658 : * premature advancement of oldest_nonremovable_xid.
4659 : *
4660 : * Performing the check during RDT_WAIT_FOR_LOCAL_FLUSH is safe, as
4661 : * invalidations are guaranteed to be processed before applying changes
4662 : * from newly added tables while waiting for the local flush to reach
4663 : * remote_lsn.
4664 : *
4665 : * Additionally, even if we check for subscription tables during
4666 : * RDT_GET_CANDIDATE_XID, they might be dropped before reaching
4667 : * RDT_WAIT_FOR_LOCAL_FLUSH. Therefore, it's still necessary to verify
4668 : * subscription tables at this stage to prevent unnecessary tuple
4669 : * retention.
4670 : */
4671 252 : if (HasSubscriptionTablesCached() && !AllTablesyncsReady())
4672 : {
4673 : TimestampTz now;
4674 :
4675 52 : now = rdt_data->last_recv_time
4676 26 : ? rdt_data->last_recv_time : GetCurrentTimestamp();
4677 :
4678 : /*
4679 : * Record the time spent waiting for table sync, it is needed for the
4680 : * timeout check in should_stop_conflict_info_retention().
4681 : */
4682 26 : rdt_data->table_sync_wait_time =
4683 26 : TimestampDifferenceMilliseconds(rdt_data->candidate_xid_time, now);
4684 :
4685 26 : return;
4686 : }
4687 :
4688 : /*
4689 : * We don't need to maintain oldest_nonremovable_xid if we decide to stop
4690 : * retaining conflict information for this worker.
4691 : */
4692 226 : if (should_stop_conflict_info_retention(rdt_data))
4693 : {
4694 2 : rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
4695 2 : return;
4696 : }
4697 :
4698 : /*
4699 : * Update and check the remote flush position if we are applying changes
4700 : * in a loop. This is done at most once per WalWriterDelay to avoid
4701 : * performing costly operations in get_flush_position() too frequently
4702 : * during change application.
4703 : */
4704 304 : if (last_flushpos < rdt_data->remote_lsn && rdt_data->last_recv_time &&
4705 80 : TimestampDifferenceExceeds(rdt_data->flushpos_update_time,
4706 : rdt_data->last_recv_time, WalWriterDelay))
4707 : {
4708 : XLogRecPtr writepos;
4709 : XLogRecPtr flushpos;
4710 : bool have_pending_txes;
4711 :
4712 : /* Fetch the latest remote flush position */
4713 30 : get_flush_position(&writepos, &flushpos, &have_pending_txes);
4714 :
4715 30 : if (flushpos > last_flushpos)
4716 0 : last_flushpos = flushpos;
4717 :
4718 30 : rdt_data->flushpos_update_time = rdt_data->last_recv_time;
4719 : }
4720 :
4721 : /* Return to wait for the changes to be applied */
4722 224 : if (last_flushpos < rdt_data->remote_lsn)
4723 146 : return;
4724 :
4725 : /*
4726 : * Reaching this point implies should_stop_conflict_info_retention()
4727 : * returned false earlier, meaning that the most recent duration for
4728 : * advancing the non-removable transaction ID is within the
4729 : * max_retention_duration or max_retention_duration is set to 0.
4730 : *
4731 : * Therefore, if conflict info retention was previously stopped due to a
4732 : * timeout, it is now safe to resume retention.
4733 : */
4734 78 : if (!MySubscription->retentionactive)
4735 : {
4736 2 : rdt_data->phase = RDT_RESUME_CONFLICT_INFO_RETENTION;
4737 2 : return;
4738 : }
4739 :
4740 : /*
4741 : * Reaching here means the remote WAL position has been received, and all
4742 : * transactions up to that position on the publisher have been applied and
4743 : * flushed locally. So, we can advance the non-removable transaction ID.
4744 : */
4745 76 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
4746 76 : MyLogicalRepWorker->oldest_nonremovable_xid = rdt_data->candidate_xid;
4747 76 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
4748 :
4749 76 : elog(DEBUG2, "confirmed flush up to remote lsn %X/%08X: new oldest_nonremovable_xid %u",
4750 : LSN_FORMAT_ARGS(rdt_data->remote_lsn),
4751 : rdt_data->candidate_xid);
4752 :
4753 : /* Notify launcher to update the xmin of the conflict slot */
4754 76 : ApplyLauncherWakeup();
4755 :
4756 76 : reset_retention_data_fields(rdt_data);
4757 :
4758 : /* process the next phase */
4759 76 : process_rdt_phase_transition(rdt_data, false);
4760 : }
4761 :
4762 : /*
4763 : * Check whether conflict information retention should be stopped due to
4764 : * exceeding the maximum wait time (max_retention_duration).
4765 : *
4766 : * If retention should be stopped, return true. Otherwise, return false.
4767 : */
4768 : static bool
4769 20224 : should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
4770 : {
4771 : TimestampTz now;
4772 :
4773 : Assert(TransactionIdIsValid(rdt_data->candidate_xid));
4774 : Assert(rdt_data->phase == RDT_WAIT_FOR_PUBLISHER_STATUS ||
4775 : rdt_data->phase == RDT_WAIT_FOR_LOCAL_FLUSH);
4776 :
4777 20224 : if (!MySubscription->maxretention)
4778 20222 : return false;
4779 :
4780 : /*
4781 : * Use last_recv_time when applying changes in the loop to avoid
4782 : * unnecessary system time retrieval. If last_recv_time is not available,
4783 : * obtain the current timestamp.
4784 : */
4785 2 : now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4786 :
4787 : /*
4788 : * Return early if the wait time has not exceeded the configured maximum
4789 : * (max_retention_duration). Time spent waiting for table synchronization
4790 : * is excluded from this calculation, as it occurs infrequently.
4791 : */
4792 2 : if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
4793 2 : MySubscription->maxretention +
4794 2 : rdt_data->table_sync_wait_time))
4795 0 : return false;
4796 :
4797 2 : return true;
4798 : }
4799 :
4800 : /*
4801 : * Workhorse for the RDT_STOP_CONFLICT_INFO_RETENTION phase.
4802 : */
4803 : static void
4804 2 : stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
4805 : {
4806 : /* Stop retention if not yet */
4807 2 : if (MySubscription->retentionactive)
4808 : {
4809 : /*
4810 : * If the retention status cannot be updated (e.g., due to active
4811 : * transaction), skip further processing to avoid inconsistent
4812 : * retention behavior.
4813 : */
4814 2 : if (!update_retention_status(false))
4815 0 : return;
4816 :
4817 2 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
4818 2 : MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId;
4819 2 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
4820 :
4821 2 : ereport(LOG,
4822 : errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts",
4823 : MySubscription->name),
4824 : errdetail("Retention is stopped because the apply process has not caught up with the publisher within the configured max_retention_duration."));
4825 : }
4826 :
4827 : Assert(!TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid));
4828 :
4829 : /*
4830 : * If retention has been stopped, reset to the initial phase to retry
4831 : * resuming retention. This reset is required to recalculate the current
4832 : * wait time and resume retention if the time falls within
4833 : * max_retention_duration.
4834 : */
4835 2 : reset_retention_data_fields(rdt_data);
4836 : }
4837 :
4838 : /*
4839 : * Workhorse for the RDT_RESUME_CONFLICT_INFO_RETENTION phase.
4840 : */
4841 : static void
4842 2 : resume_conflict_info_retention(RetainDeadTuplesData *rdt_data)
4843 : {
4844 : /* We can't resume retention without updating retention status. */
4845 2 : if (!update_retention_status(true))
4846 0 : return;
4847 :
4848 2 : ereport(LOG,
4849 : errmsg("logical replication worker for subscription \"%s\" will resume retaining the information for detecting conflicts",
4850 : MySubscription->name),
4851 : MySubscription->maxretention
4852 : ? errdetail("Retention is re-enabled because the apply process has caught up with the publisher within the configured max_retention_duration.")
4853 : : errdetail("Retention is re-enabled because max_retention_duration has been set to unlimited."));
4854 :
4855 : /*
4856 : * Restart the worker to let the launcher initialize
4857 : * oldest_nonremovable_xid at startup.
4858 : *
4859 : * While it's technically possible to derive this value on-the-fly using
4860 : * the conflict detection slot's xmin, doing so risks a race condition:
4861 : * the launcher might clean slot.xmin just after retention resumes. This
4862 : * would make oldest_nonremovable_xid unreliable, especially during xid
4863 : * wraparound.
4864 : *
4865 : * Although this can be prevented by introducing heavy weight locking, the
4866 : * complexity it will bring doesn't seem worthwhile given how rarely
4867 : * retention is resumed.
4868 : */
4869 2 : apply_worker_exit();
4870 : }
4871 :
4872 : /*
4873 : * Updates pg_subscription.subretentionactive to the given value within a
4874 : * new transaction.
4875 : *
4876 : * If already inside an active transaction, skips the update and returns
4877 : * false.
4878 : *
4879 : * Returns true if the update is successfully performed.
4880 : */
4881 : static bool
4882 4 : update_retention_status(bool active)
4883 : {
4884 : /*
4885 : * Do not update the catalog during an active transaction. The transaction
4886 : * may be started during change application, leading to a possible
4887 : * rollback of catalog updates if the application fails subsequently.
4888 : */
4889 4 : if (IsTransactionState())
4890 0 : return false;
4891 :
4892 4 : StartTransactionCommand();
4893 :
4894 : /*
4895 : * Updating pg_subscription might involve TOAST table access, so ensure we
4896 : * have a valid snapshot.
4897 : */
4898 4 : PushActiveSnapshot(GetTransactionSnapshot());
4899 :
4900 : /* Update pg_subscription.subretentionactive */
4901 4 : UpdateDeadTupleRetentionStatus(MySubscription->oid, active);
4902 :
4903 4 : PopActiveSnapshot();
4904 4 : CommitTransactionCommand();
4905 :
4906 : /* Notify launcher to update the conflict slot */
4907 4 : ApplyLauncherWakeup();
4908 :
4909 4 : MySubscription->retentionactive = active;
4910 :
4911 4 : return true;
4912 : }
4913 :
4914 : /*
4915 : * Reset all data fields of RetainDeadTuplesData except those used to
4916 : * determine the timing for the next round of transaction ID advancement. We
4917 : * can even use flushpos_update_time in the next round to decide whether to get
4918 : * the latest flush position.
4919 : */
4920 : static void
4921 78 : reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
4922 : {
4923 78 : rdt_data->phase = RDT_GET_CANDIDATE_XID;
4924 78 : rdt_data->remote_lsn = InvalidXLogRecPtr;
4925 78 : rdt_data->remote_oldestxid = InvalidFullTransactionId;
4926 78 : rdt_data->remote_nextxid = InvalidFullTransactionId;
4927 78 : rdt_data->reply_time = 0;
4928 78 : rdt_data->remote_wait_for = InvalidFullTransactionId;
4929 78 : rdt_data->candidate_xid = InvalidTransactionId;
4930 78 : rdt_data->table_sync_wait_time = 0;
4931 78 : }
4932 :
4933 : /*
4934 : * Adjust the interval for advancing non-removable transaction IDs.
4935 : *
4936 : * If there is no activity on the node or retention has been stopped, we
4937 : * progressively double the interval used to advance non-removable transaction
4938 : * ID. This helps conserve CPU and network resources when there's little benefit
4939 : * to frequent updates.
4940 : *
4941 : * The interval is capped by the lowest of the following:
4942 : * - wal_receiver_status_interval (if set and retention is active),
4943 : * - a default maximum of 3 minutes,
4944 : * - max_retention_duration (if retention is active).
4945 : *
4946 : * This ensures the interval never exceeds the retention boundary, even if other
4947 : * limits are higher. Once activity resumes on the node and the retention is
4948 : * active, the interval is reset to lesser of 100ms and max_retention_duration,
4949 : * allowing timely advancement of non-removable transaction ID.
4950 : *
4951 : * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
4952 : * consider the other interval or a separate GUC if the need arises.
4953 : */
4954 : static void
4955 400 : adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
4956 : {
4957 400 : if (rdt_data->xid_advance_interval && !new_xid_found)
4958 0 : {
4959 0 : int max_interval = wal_receiver_status_interval
4960 0 : ? wal_receiver_status_interval * 1000
4961 0 : : MAX_XID_ADVANCE_INTERVAL;
4962 :
4963 : /*
4964 : * No new transaction ID has been assigned since the last check, so
4965 : * double the interval, but not beyond the maximum allowable value.
4966 : */
4967 0 : rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
4968 : max_interval);
4969 : }
4970 400 : else if (rdt_data->xid_advance_interval &&
4971 0 : !MySubscription->retentionactive)
4972 : {
4973 : /*
4974 : * Retention has been stopped, so double the interval-capped at a
4975 : * maximum of 3 minutes. The wal_receiver_status_interval is
4976 : * intentionally not used as a upper bound, since the likelihood of
4977 : * retention resuming is lower than that of general activity resuming.
4978 : */
4979 0 : rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
4980 : MAX_XID_ADVANCE_INTERVAL);
4981 : }
4982 : else
4983 : {
4984 : /*
4985 : * A new transaction ID was found or the interval is not yet
4986 : * initialized, so set the interval to the minimum value.
4987 : */
4988 400 : rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL;
4989 : }
4990 :
4991 : /*
4992 : * Ensure the wait time remains within the maximum retention time limit
4993 : * when retention is active.
4994 : */
4995 400 : if (MySubscription->retentionactive)
4996 398 : rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval,
4997 : MySubscription->maxretention);
4998 400 : }
4999 :
5000 : /*
5001 : * Exit routine for apply workers due to subscription parameter changes.
5002 : */
5003 : static void
5004 90 : apply_worker_exit(void)
5005 : {
5006 90 : if (am_parallel_apply_worker())
5007 : {
5008 : /*
5009 : * Don't stop the parallel apply worker as the leader will detect the
5010 : * subscription parameter change and restart logical replication later
5011 : * anyway. This also prevents the leader from reporting errors when
5012 : * trying to communicate with a stopped parallel apply worker, which
5013 : * would accidentally disable subscriptions if disable_on_error was
5014 : * set.
5015 : */
5016 0 : return;
5017 : }
5018 :
5019 : /*
5020 : * Reset the last-start time for this apply worker so that the launcher
5021 : * will restart it without waiting for wal_retrieve_retry_interval if the
5022 : * subscription is still active, and so that we won't leak that hash table
5023 : * entry if it isn't.
5024 : */
5025 90 : if (am_leader_apply_worker())
5026 90 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
5027 :
5028 90 : proc_exit(0);
5029 : }
5030 :
5031 : /*
5032 : * Reread subscription info if needed.
5033 : *
5034 : * For significant changes, we react by exiting the current process; a new
5035 : * one will be launched afterwards if needed.
5036 : */
5037 : void
5038 15932 : maybe_reread_subscription(void)
5039 : {
5040 : MemoryContext oldctx;
5041 : Subscription *newsub;
5042 15932 : bool started_tx = false;
5043 :
5044 : /* When cache state is valid there is nothing to do here. */
5045 15932 : if (MySubscriptionValid)
5046 15758 : return;
5047 :
5048 : /* This function might be called inside or outside of transaction. */
5049 174 : if (!IsTransactionState())
5050 : {
5051 166 : StartTransactionCommand();
5052 166 : started_tx = true;
5053 : }
5054 :
5055 : /* Ensure allocations in permanent context. */
5056 174 : oldctx = MemoryContextSwitchTo(ApplyContext);
5057 :
5058 174 : newsub = GetSubscription(MyLogicalRepWorker->subid, true);
5059 :
5060 : /*
5061 : * Exit if the subscription was removed. This normally should not happen
5062 : * as the worker gets killed during DROP SUBSCRIPTION.
5063 : */
5064 174 : if (!newsub)
5065 : {
5066 0 : ereport(LOG,
5067 : (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
5068 : MySubscription->name)));
5069 :
5070 : /* Ensure we remove no-longer-useful entry for worker's start time */
5071 0 : if (am_leader_apply_worker())
5072 0 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
5073 :
5074 0 : proc_exit(0);
5075 : }
5076 :
5077 : /* Exit if the subscription was disabled. */
5078 174 : if (!newsub->enabled)
5079 : {
5080 28 : ereport(LOG,
5081 : (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
5082 : MySubscription->name)));
5083 :
5084 28 : apply_worker_exit();
5085 : }
5086 :
5087 : /* !slotname should never happen when enabled is true. */
5088 : Assert(newsub->slotname);
5089 :
5090 : /* two-phase cannot be altered while the worker is running */
5091 : Assert(newsub->twophasestate == MySubscription->twophasestate);
5092 :
5093 : /*
5094 : * Exit if any parameter that affects the remote connection was changed.
5095 : * The launcher will start a new worker but note that the parallel apply
5096 : * worker won't restart if the streaming option's value is changed from
5097 : * 'parallel' to any other value or the server decides not to stream the
5098 : * in-progress transaction.
5099 : */
5100 146 : if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
5101 142 : strcmp(newsub->name, MySubscription->name) != 0 ||
5102 140 : strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
5103 140 : newsub->binary != MySubscription->binary ||
5104 128 : newsub->stream != MySubscription->stream ||
5105 118 : newsub->passwordrequired != MySubscription->passwordrequired ||
5106 118 : strcmp(newsub->origin, MySubscription->origin) != 0 ||
5107 114 : newsub->owner != MySubscription->owner ||
5108 112 : !equal(newsub->publications, MySubscription->publications))
5109 : {
5110 52 : if (am_parallel_apply_worker())
5111 0 : ereport(LOG,
5112 : (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
5113 : MySubscription->name)));
5114 : else
5115 52 : ereport(LOG,
5116 : (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
5117 : MySubscription->name)));
5118 :
5119 52 : apply_worker_exit();
5120 : }
5121 :
5122 : /*
5123 : * Exit if the subscription owner's superuser privileges have been
5124 : * revoked.
5125 : */
5126 94 : if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
5127 : {
5128 8 : if (am_parallel_apply_worker())
5129 0 : ereport(LOG,
5130 : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
5131 : MySubscription->name));
5132 : else
5133 8 : ereport(LOG,
5134 : errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
5135 : MySubscription->name));
5136 :
5137 8 : apply_worker_exit();
5138 : }
5139 :
5140 : /* Check for other changes that should never happen too. */
5141 86 : if (newsub->dbid != MySubscription->dbid)
5142 : {
5143 0 : elog(ERROR, "subscription %u changed unexpectedly",
5144 : MyLogicalRepWorker->subid);
5145 : }
5146 :
5147 : /* Clean old subscription info and switch to new one. */
5148 86 : FreeSubscription(MySubscription);
5149 86 : MySubscription = newsub;
5150 :
5151 86 : MemoryContextSwitchTo(oldctx);
5152 :
5153 : /* Change synchronous commit according to the user's wishes */
5154 86 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
5155 : PGC_BACKEND, PGC_S_OVERRIDE);
5156 :
5157 86 : if (started_tx)
5158 84 : CommitTransactionCommand();
5159 :
5160 86 : MySubscriptionValid = true;
5161 : }
5162 :
5163 : /*
5164 : * Callback from subscription syscache invalidation.
5165 : */
5166 : static void
5167 182 : subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
5168 : {
5169 182 : MySubscriptionValid = false;
5170 182 : }
5171 :
5172 : /*
5173 : * subxact_info_write
5174 : * Store information about subxacts for a toplevel transaction.
5175 : *
5176 : * For each subxact we store offset of its first change in the main file.
5177 : * The file is always over-written as a whole.
5178 : *
5179 : * XXX We should only store subxacts that were not aborted yet.
5180 : */
5181 : static void
5182 742 : subxact_info_write(Oid subid, TransactionId xid)
5183 : {
5184 : char path[MAXPGPATH];
5185 : Size len;
5186 : BufFile *fd;
5187 :
5188 : Assert(TransactionIdIsValid(xid));
5189 :
5190 : /* construct the subxact filename */
5191 742 : subxact_filename(path, subid, xid);
5192 :
5193 : /* Delete the subxacts file, if exists. */
5194 742 : if (subxact_data.nsubxacts == 0)
5195 : {
5196 578 : cleanup_subxact_info();
5197 578 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
5198 :
5199 578 : return;
5200 : }
5201 :
5202 : /*
5203 : * Create the subxact file if it not already created, otherwise open the
5204 : * existing file.
5205 : */
5206 164 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR,
5207 : true);
5208 164 : if (fd == NULL)
5209 16 : fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path);
5210 :
5211 164 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
5212 :
5213 : /* Write the subxact count and subxact info */
5214 164 : BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
5215 164 : BufFileWrite(fd, subxact_data.subxacts, len);
5216 :
5217 164 : BufFileClose(fd);
5218 :
5219 : /* free the memory allocated for subxact info */
5220 164 : cleanup_subxact_info();
5221 : }
5222 :
5223 : /*
5224 : * subxact_info_read
5225 : * Restore information about subxacts of a streamed transaction.
5226 : *
5227 : * Read information about subxacts into the structure subxact_data that can be
5228 : * used later.
5229 : */
5230 : static void
5231 686 : subxact_info_read(Oid subid, TransactionId xid)
5232 : {
5233 : char path[MAXPGPATH];
5234 : Size len;
5235 : BufFile *fd;
5236 : MemoryContext oldctx;
5237 :
5238 : Assert(!subxact_data.subxacts);
5239 : Assert(subxact_data.nsubxacts == 0);
5240 : Assert(subxact_data.nsubxacts_max == 0);
5241 :
5242 : /*
5243 : * If the subxact file doesn't exist that means we don't have any subxact
5244 : * info.
5245 : */
5246 686 : subxact_filename(path, subid, xid);
5247 686 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
5248 : true);
5249 686 : if (fd == NULL)
5250 528 : return;
5251 :
5252 : /* read number of subxact items */
5253 158 : BufFileReadExact(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
5254 :
5255 158 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
5256 :
5257 : /* we keep the maximum as a power of 2 */
5258 158 : subxact_data.nsubxacts_max = 1 << pg_ceil_log2_32(subxact_data.nsubxacts);
5259 :
5260 : /*
5261 : * Allocate subxact information in the logical streaming context. We need
5262 : * this information during the complete stream so that we can add the sub
5263 : * transaction info to this. On stream stop we will flush this information
5264 : * to the subxact file and reset the logical streaming context.
5265 : */
5266 158 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
5267 158 : subxact_data.subxacts = palloc(subxact_data.nsubxacts_max *
5268 : sizeof(SubXactInfo));
5269 158 : MemoryContextSwitchTo(oldctx);
5270 :
5271 158 : if (len > 0)
5272 158 : BufFileReadExact(fd, subxact_data.subxacts, len);
5273 :
5274 158 : BufFileClose(fd);
5275 : }
5276 :
5277 : /*
5278 : * subxact_info_add
5279 : * Add information about a subxact (offset in the main file).
5280 : */
5281 : static void
5282 205026 : subxact_info_add(TransactionId xid)
5283 : {
5284 205026 : SubXactInfo *subxacts = subxact_data.subxacts;
5285 : int64 i;
5286 :
5287 : /* We must have a valid top level stream xid and a stream fd. */
5288 : Assert(TransactionIdIsValid(stream_xid));
5289 : Assert(stream_fd != NULL);
5290 :
5291 : /*
5292 : * If the XID matches the toplevel transaction, we don't want to add it.
5293 : */
5294 205026 : if (stream_xid == xid)
5295 184778 : return;
5296 :
5297 : /*
5298 : * In most cases we're checking the same subxact as we've already seen in
5299 : * the last call, so make sure to ignore it (this change comes later).
5300 : */
5301 20248 : if (subxact_data.subxact_last == xid)
5302 20096 : return;
5303 :
5304 : /* OK, remember we're processing this XID. */
5305 152 : subxact_data.subxact_last = xid;
5306 :
5307 : /*
5308 : * Check if the transaction is already present in the array of subxact. We
5309 : * intentionally scan the array from the tail, because we're likely adding
5310 : * a change for the most recent subtransactions.
5311 : *
5312 : * XXX Can we rely on the subxact XIDs arriving in sorted order? That
5313 : * would allow us to use binary search here.
5314 : */
5315 190 : for (i = subxact_data.nsubxacts; i > 0; i--)
5316 : {
5317 : /* found, so we're done */
5318 152 : if (subxacts[i - 1].xid == xid)
5319 114 : return;
5320 : }
5321 :
5322 : /* This is a new subxact, so we need to add it to the array. */
5323 38 : if (subxact_data.nsubxacts == 0)
5324 : {
5325 : MemoryContext oldctx;
5326 :
5327 16 : subxact_data.nsubxacts_max = 128;
5328 :
5329 : /*
5330 : * Allocate this memory for subxacts in per-stream context, see
5331 : * subxact_info_read.
5332 : */
5333 16 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
5334 16 : subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
5335 16 : MemoryContextSwitchTo(oldctx);
5336 : }
5337 22 : else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
5338 : {
5339 20 : subxact_data.nsubxacts_max *= 2;
5340 20 : subxacts = repalloc(subxacts,
5341 20 : subxact_data.nsubxacts_max * sizeof(SubXactInfo));
5342 : }
5343 :
5344 38 : subxacts[subxact_data.nsubxacts].xid = xid;
5345 :
5346 : /*
5347 : * Get the current offset of the stream file and store it as offset of
5348 : * this subxact.
5349 : */
5350 38 : BufFileTell(stream_fd,
5351 38 : &subxacts[subxact_data.nsubxacts].fileno,
5352 38 : &subxacts[subxact_data.nsubxacts].offset);
5353 :
5354 38 : subxact_data.nsubxacts++;
5355 38 : subxact_data.subxacts = subxacts;
5356 : }
5357 :
5358 : /* format filename for file containing the info about subxacts */
5359 : static inline void
5360 1490 : subxact_filename(char *path, Oid subid, TransactionId xid)
5361 : {
5362 1490 : snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
5363 1490 : }
5364 :
5365 : /* format filename for file containing serialized changes */
5366 : static inline void
5367 874 : changes_filename(char *path, Oid subid, TransactionId xid)
5368 : {
5369 874 : snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
5370 874 : }
5371 :
5372 : /*
5373 : * stream_cleanup_files
5374 : * Cleanup files for a subscription / toplevel transaction.
5375 : *
5376 : * Remove files with serialized changes and subxact info for a particular
5377 : * toplevel transaction. Each subscription has a separate set of files
5378 : * for any toplevel transaction.
5379 : */
5380 : void
5381 62 : stream_cleanup_files(Oid subid, TransactionId xid)
5382 : {
5383 : char path[MAXPGPATH];
5384 :
5385 : /* Delete the changes file. */
5386 62 : changes_filename(path, subid, xid);
5387 62 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false);
5388 :
5389 : /* Delete the subxact file, if it exists. */
5390 62 : subxact_filename(path, subid, xid);
5391 62 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
5392 62 : }
5393 :
5394 : /*
5395 : * stream_open_file
5396 : * Open a file that we'll use to serialize changes for a toplevel
5397 : * transaction.
5398 : *
5399 : * Open a file for streamed changes from a toplevel transaction identified
5400 : * by stream_xid (global variable). If it's the first chunk of streamed
5401 : * changes for this transaction, create the buffile, otherwise open the
5402 : * previously created file.
5403 : */
5404 : static void
5405 724 : stream_open_file(Oid subid, TransactionId xid, bool first_segment)
5406 : {
5407 : char path[MAXPGPATH];
5408 : MemoryContext oldcxt;
5409 :
5410 : Assert(OidIsValid(subid));
5411 : Assert(TransactionIdIsValid(xid));
5412 : Assert(stream_fd == NULL);
5413 :
5414 :
5415 724 : changes_filename(path, subid, xid);
5416 724 : elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
5417 :
5418 : /*
5419 : * Create/open the buffiles under the logical streaming context so that we
5420 : * have those files until stream stop.
5421 : */
5422 724 : oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
5423 :
5424 : /*
5425 : * If this is the first streamed segment, create the changes file.
5426 : * Otherwise, just open the file for writing, in append mode.
5427 : */
5428 724 : if (first_segment)
5429 64 : stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset,
5430 : path);
5431 : else
5432 : {
5433 : /*
5434 : * Open the file and seek to the end of the file because we always
5435 : * append the changes file.
5436 : */
5437 660 : stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset,
5438 : path, O_RDWR, false);
5439 660 : BufFileSeek(stream_fd, 0, 0, SEEK_END);
5440 : }
5441 :
5442 724 : MemoryContextSwitchTo(oldcxt);
5443 724 : }
5444 :
5445 : /*
5446 : * stream_close_file
5447 : * Close the currently open file with streamed changes.
5448 : */
5449 : static void
5450 784 : stream_close_file(void)
5451 : {
5452 : Assert(stream_fd != NULL);
5453 :
5454 784 : BufFileClose(stream_fd);
5455 :
5456 784 : stream_fd = NULL;
5457 784 : }
5458 :
5459 : /*
5460 : * stream_write_change
5461 : * Serialize a change to a file for the current toplevel transaction.
5462 : *
5463 : * The change is serialized in a simple format, with length (not including
5464 : * the length), action code (identifying the message type) and message
5465 : * contents (without the subxact TransactionId value).
5466 : */
5467 : static void
5468 215108 : stream_write_change(char action, StringInfo s)
5469 : {
5470 : int len;
5471 :
5472 : Assert(stream_fd != NULL);
5473 :
5474 : /* total on-disk size, including the action type character */
5475 215108 : len = (s->len - s->cursor) + sizeof(char);
5476 :
5477 : /* first write the size */
5478 215108 : BufFileWrite(stream_fd, &len, sizeof(len));
5479 :
5480 : /* then the action */
5481 215108 : BufFileWrite(stream_fd, &action, sizeof(action));
5482 :
5483 : /* and finally the remaining part of the buffer (after the XID) */
5484 215108 : len = (s->len - s->cursor);
5485 :
5486 215108 : BufFileWrite(stream_fd, &s->data[s->cursor], len);
5487 215108 : }
5488 :
5489 : /*
5490 : * stream_open_and_write_change
5491 : * Serialize a message to a file for the given transaction.
5492 : *
5493 : * This function is similar to stream_write_change except that it will open the
5494 : * target file if not already before writing the message and close the file at
5495 : * the end.
5496 : */
5497 : static void
5498 10 : stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
5499 : {
5500 : Assert(!in_streamed_transaction);
5501 :
5502 10 : if (!stream_fd)
5503 10 : stream_start_internal(xid, false);
5504 :
5505 10 : stream_write_change(action, s);
5506 10 : stream_stop_internal(xid);
5507 10 : }
5508 :
5509 : /*
5510 : * Sets streaming options including replication slot name and origin start
5511 : * position. Workers need these options for logical replication.
5512 : */
5513 : void
5514 844 : set_stream_options(WalRcvStreamOptions *options,
5515 : char *slotname,
5516 : XLogRecPtr *origin_startpos)
5517 : {
5518 : int server_version;
5519 :
5520 844 : options->logical = true;
5521 844 : options->startpoint = *origin_startpos;
5522 844 : options->slotname = slotname;
5523 :
5524 844 : server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
5525 844 : options->proto.logical.proto_version =
5526 844 : server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
5527 : server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
5528 : server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
5529 : LOGICALREP_PROTO_VERSION_NUM;
5530 :
5531 844 : options->proto.logical.publication_names = MySubscription->publications;
5532 844 : options->proto.logical.binary = MySubscription->binary;
5533 :
5534 : /*
5535 : * Assign the appropriate option value for streaming option according to
5536 : * the 'streaming' mode and the publisher's ability to support that mode.
5537 : */
5538 844 : if (server_version >= 160000 &&
5539 844 : MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
5540 : {
5541 776 : options->proto.logical.streaming_str = "parallel";
5542 776 : MyLogicalRepWorker->parallel_apply = true;
5543 : }
5544 68 : else if (server_version >= 140000 &&
5545 68 : MySubscription->stream != LOGICALREP_STREAM_OFF)
5546 : {
5547 52 : options->proto.logical.streaming_str = "on";
5548 52 : MyLogicalRepWorker->parallel_apply = false;
5549 : }
5550 : else
5551 : {
5552 16 : options->proto.logical.streaming_str = NULL;
5553 16 : MyLogicalRepWorker->parallel_apply = false;
5554 : }
5555 :
5556 844 : options->proto.logical.twophase = false;
5557 844 : options->proto.logical.origin = pstrdup(MySubscription->origin);
5558 844 : }
5559 :
5560 : /*
5561 : * Cleanup the memory for subxacts and reset the related variables.
5562 : */
5563 : static inline void
5564 750 : cleanup_subxact_info()
5565 : {
5566 750 : if (subxact_data.subxacts)
5567 174 : pfree(subxact_data.subxacts);
5568 :
5569 750 : subxact_data.subxacts = NULL;
5570 750 : subxact_data.subxact_last = InvalidTransactionId;
5571 750 : subxact_data.nsubxacts = 0;
5572 750 : subxact_data.nsubxacts_max = 0;
5573 750 : }
5574 :
5575 : /*
5576 : * Common function to run the apply loop with error handling. Disable the
5577 : * subscription, if necessary.
5578 : *
5579 : * Note that we don't handle FATAL errors which are probably because
5580 : * of system resource error and are not repeatable.
5581 : */
5582 : void
5583 844 : start_apply(XLogRecPtr origin_startpos)
5584 : {
5585 844 : PG_TRY();
5586 : {
5587 844 : LogicalRepApplyLoop(origin_startpos);
5588 : }
5589 186 : PG_CATCH();
5590 : {
5591 : /*
5592 : * Reset the origin state to prevent the advancement of origin
5593 : * progress if we fail to apply. Otherwise, this will result in
5594 : * transaction loss as that transaction won't be sent again by the
5595 : * server.
5596 : */
5597 186 : replorigin_reset(0, (Datum) 0);
5598 :
5599 186 : if (MySubscription->disableonerr)
5600 6 : DisableSubscriptionAndExit();
5601 : else
5602 : {
5603 : /*
5604 : * Report the worker failed while applying changes. Abort the
5605 : * current transaction so that the stats message is sent in an
5606 : * idle state.
5607 : */
5608 180 : AbortOutOfAnyTransaction();
5609 180 : pgstat_report_subscription_error(MySubscription->oid,
5610 180 : MyLogicalRepWorker->type);
5611 :
5612 180 : PG_RE_THROW();
5613 : }
5614 : }
5615 0 : PG_END_TRY();
5616 0 : }
5617 :
5618 : /*
5619 : * Runs the leader apply worker.
5620 : *
5621 : * It sets up replication origin, streaming options and then starts streaming.
5622 : */
5623 : static void
5624 568 : run_apply_worker()
5625 : {
5626 : char originname[NAMEDATALEN];
5627 568 : XLogRecPtr origin_startpos = InvalidXLogRecPtr;
5628 568 : char *slotname = NULL;
5629 : WalRcvStreamOptions options;
5630 : RepOriginId originid;
5631 : TimeLineID startpointTLI;
5632 : char *err;
5633 : bool must_use_password;
5634 :
5635 568 : slotname = MySubscription->slotname;
5636 :
5637 : /*
5638 : * This shouldn't happen if the subscription is enabled, but guard against
5639 : * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
5640 : * slot is NULL.)
5641 : */
5642 568 : if (!slotname)
5643 0 : ereport(ERROR,
5644 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5645 : errmsg("subscription has no replication slot set")));
5646 :
5647 : /* Setup replication origin tracking. */
5648 568 : ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
5649 : originname, sizeof(originname));
5650 568 : StartTransactionCommand();
5651 568 : originid = replorigin_by_name(originname, true);
5652 568 : if (!OidIsValid(originid))
5653 0 : originid = replorigin_create(originname);
5654 568 : replorigin_session_setup(originid, 0);
5655 568 : replorigin_session_origin = originid;
5656 568 : origin_startpos = replorigin_session_get_progress(false);
5657 568 : CommitTransactionCommand();
5658 :
5659 : /* Is the use of a password mandatory? */
5660 1090 : must_use_password = MySubscription->passwordrequired &&
5661 522 : !MySubscription->ownersuperuser;
5662 :
5663 568 : LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
5664 : true, must_use_password,
5665 : MySubscription->name, &err);
5666 :
5667 544 : if (LogRepWorkerWalRcvConn == NULL)
5668 68 : ereport(ERROR,
5669 : (errcode(ERRCODE_CONNECTION_FAILURE),
5670 : errmsg("apply worker for subscription \"%s\" could not connect to the publisher: %s",
5671 : MySubscription->name, err)));
5672 :
5673 : /*
5674 : * We don't really use the output identify_system for anything but it does
5675 : * some initializations on the upstream so let's still call it.
5676 : */
5677 476 : (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
5678 :
5679 476 : set_apply_error_context_origin(originname);
5680 :
5681 476 : set_stream_options(&options, slotname, &origin_startpos);
5682 :
5683 : /*
5684 : * Even when the two_phase mode is requested by the user, it remains as
5685 : * the tri-state PENDING until all tablesyncs have reached READY state.
5686 : * Only then, can it become ENABLED.
5687 : *
5688 : * Note: If the subscription has no tables then leave the state as
5689 : * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
5690 : * work.
5691 : */
5692 508 : if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
5693 32 : AllTablesyncsReady())
5694 : {
5695 : /* Start streaming with two_phase enabled */
5696 18 : options.proto.logical.twophase = true;
5697 18 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
5698 :
5699 18 : StartTransactionCommand();
5700 :
5701 : /*
5702 : * Updating pg_subscription might involve TOAST table access, so
5703 : * ensure we have a valid snapshot.
5704 : */
5705 18 : PushActiveSnapshot(GetTransactionSnapshot());
5706 :
5707 18 : UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
5708 18 : MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
5709 18 : PopActiveSnapshot();
5710 18 : CommitTransactionCommand();
5711 : }
5712 : else
5713 : {
5714 458 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
5715 : }
5716 :
5717 476 : ereport(DEBUG1,
5718 : (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
5719 : MySubscription->name,
5720 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
5721 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
5722 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
5723 : "?")));
5724 :
5725 : /* Run the main loop. */
5726 476 : start_apply(origin_startpos);
5727 0 : }
5728 :
5729 : /*
5730 : * Common initialization for leader apply worker, parallel apply worker,
5731 : * tablesync worker and sequencesync worker.
5732 : *
5733 : * Initialize the database connection, in-memory subscription and necessary
5734 : * config options.
5735 : */
5736 : void
5737 1136 : InitializeLogRepWorker(void)
5738 : {
5739 : MemoryContext oldctx;
5740 :
5741 : /* Run as replica session replication role. */
5742 1136 : SetConfigOption("session_replication_role", "replica",
5743 : PGC_SUSET, PGC_S_OVERRIDE);
5744 :
5745 : /* Connect to our database. */
5746 1136 : BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
5747 1136 : MyLogicalRepWorker->userid,
5748 : 0);
5749 :
5750 : /*
5751 : * Set always-secure search path, so malicious users can't redirect user
5752 : * code (e.g. pg_index.indexprs).
5753 : */
5754 1128 : SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
5755 :
5756 : /* Load the subscription into persistent memory context. */
5757 1128 : ApplyContext = AllocSetContextCreate(TopMemoryContext,
5758 : "ApplyContext",
5759 : ALLOCSET_DEFAULT_SIZES);
5760 1128 : StartTransactionCommand();
5761 1128 : oldctx = MemoryContextSwitchTo(ApplyContext);
5762 :
5763 : /*
5764 : * Lock the subscription to prevent it from being concurrently dropped,
5765 : * then re-verify its existence. After the initialization, the worker will
5766 : * be terminated gracefully if the subscription is dropped.
5767 : */
5768 1128 : LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, 0,
5769 : AccessShareLock);
5770 1126 : MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
5771 1126 : if (!MySubscription)
5772 : {
5773 126 : ereport(LOG,
5774 : (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
5775 : MyLogicalRepWorker->subid)));
5776 :
5777 : /* Ensure we remove no-longer-useful entry for worker's start time */
5778 126 : if (am_leader_apply_worker())
5779 126 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
5780 :
5781 126 : proc_exit(0);
5782 : }
5783 :
5784 1000 : MySubscriptionValid = true;
5785 1000 : MemoryContextSwitchTo(oldctx);
5786 :
5787 1000 : if (!MySubscription->enabled)
5788 : {
5789 0 : ereport(LOG,
5790 : (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
5791 : MySubscription->name)));
5792 :
5793 0 : apply_worker_exit();
5794 : }
5795 :
5796 : /*
5797 : * Restart the worker if retain_dead_tuples was enabled during startup.
5798 : *
5799 : * At this point, the replication slot used for conflict detection might
5800 : * not exist yet, or could be dropped soon if the launcher perceives
5801 : * retain_dead_tuples as disabled. To avoid unnecessary tracking of
5802 : * oldest_nonremovable_xid when the slot is absent or at risk of being
5803 : * dropped, a restart is initiated.
5804 : *
5805 : * The oldest_nonremovable_xid should be initialized only when the
5806 : * subscription's retention is active before launching the worker. See
5807 : * logicalrep_worker_launch.
5808 : */
5809 1000 : if (am_leader_apply_worker() &&
5810 568 : MySubscription->retaindeadtuples &&
5811 32 : MySubscription->retentionactive &&
5812 32 : !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid))
5813 : {
5814 0 : ereport(LOG,
5815 : errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup",
5816 : MySubscription->name, "retain_dead_tuples"));
5817 :
5818 0 : apply_worker_exit();
5819 : }
5820 :
5821 : /* Setup synchronous commit according to the user's wishes */
5822 1000 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
5823 : PGC_BACKEND, PGC_S_OVERRIDE);
5824 :
5825 : /*
5826 : * Keep us informed about subscription or role changes. Note that the
5827 : * role's superuser privilege can be revoked.
5828 : */
5829 1000 : CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
5830 : subscription_change_cb,
5831 : (Datum) 0);
5832 :
5833 1000 : CacheRegisterSyscacheCallback(AUTHOID,
5834 : subscription_change_cb,
5835 : (Datum) 0);
5836 :
5837 1000 : if (am_tablesync_worker())
5838 394 : ereport(LOG,
5839 : errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
5840 : MySubscription->name,
5841 : get_rel_name(MyLogicalRepWorker->relid)));
5842 606 : else if (am_sequencesync_worker())
5843 18 : ereport(LOG,
5844 : errmsg("logical replication sequence synchronization worker for subscription \"%s\" has started",
5845 : MySubscription->name));
5846 : else
5847 588 : ereport(LOG,
5848 : errmsg("logical replication apply worker for subscription \"%s\" has started",
5849 : MySubscription->name));
5850 :
5851 1000 : CommitTransactionCommand();
5852 1000 : }
5853 :
5854 : /*
5855 : * Reset the origin state.
5856 : */
5857 : static void
5858 1166 : replorigin_reset(int code, Datum arg)
5859 : {
5860 1166 : replorigin_session_origin = InvalidRepOriginId;
5861 1166 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
5862 1166 : replorigin_session_origin_timestamp = 0;
5863 1166 : }
5864 :
5865 : /*
5866 : * Common function to setup the leader apply, tablesync and sequencesync worker.
5867 : */
5868 : void
5869 1116 : SetupApplyOrSyncWorker(int worker_slot)
5870 : {
5871 : /* Attach to slot */
5872 1116 : logicalrep_worker_attach(worker_slot);
5873 :
5874 : Assert(am_tablesync_worker() || am_sequencesync_worker() || am_leader_apply_worker());
5875 :
5876 : /* Setup signal handling */
5877 1116 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
5878 1116 : pqsignal(SIGTERM, die);
5879 1116 : BackgroundWorkerUnblockSignals();
5880 :
5881 : /*
5882 : * We don't currently need any ResourceOwner in a walreceiver process, but
5883 : * if we did, we could call CreateAuxProcessResourceOwner here.
5884 : */
5885 :
5886 : /* Initialise stats to a sanish value */
5887 1116 : MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
5888 1116 : MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
5889 :
5890 : /* Load the libpq-specific functions */
5891 1116 : load_file("libpqwalreceiver", false);
5892 :
5893 1116 : InitializeLogRepWorker();
5894 :
5895 : /*
5896 : * Register a callback to reset the origin state before aborting any
5897 : * pending transaction during shutdown (see ShutdownPostgres()). This will
5898 : * avoid origin advancement for an in-complete transaction which could
5899 : * otherwise lead to its loss as such a transaction won't be sent by the
5900 : * server again.
5901 : *
5902 : * Note that even a LOG or DEBUG statement placed after setting the origin
5903 : * state may process a shutdown signal before committing the current apply
5904 : * operation. So, it is important to register such a callback here.
5905 : */
5906 980 : before_shmem_exit(replorigin_reset, (Datum) 0);
5907 :
5908 : /* Connect to the origin and start the replication. */
5909 980 : elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
5910 : MySubscription->conninfo);
5911 :
5912 : /*
5913 : * Setup callback for syscache so that we know when something changes in
5914 : * the subscription relation state.
5915 : */
5916 980 : CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
5917 : InvalidateSyncingRelStates,
5918 : (Datum) 0);
5919 980 : }
5920 :
5921 : /* Logical Replication Apply worker entry point */
5922 : void
5923 702 : ApplyWorkerMain(Datum main_arg)
5924 : {
5925 702 : int worker_slot = DatumGetInt32(main_arg);
5926 :
5927 702 : InitializingApplyWorker = true;
5928 :
5929 702 : SetupApplyOrSyncWorker(worker_slot);
5930 :
5931 568 : InitializingApplyWorker = false;
5932 :
5933 568 : run_apply_worker();
5934 :
5935 0 : proc_exit(0);
5936 : }
5937 :
5938 : /*
5939 : * After error recovery, disable the subscription in a new transaction
5940 : * and exit cleanly.
5941 : */
5942 : void
5943 8 : DisableSubscriptionAndExit(void)
5944 : {
5945 : /*
5946 : * Emit the error message, and recover from the error state to an idle
5947 : * state
5948 : */
5949 8 : HOLD_INTERRUPTS();
5950 :
5951 8 : EmitErrorReport();
5952 8 : AbortOutOfAnyTransaction();
5953 8 : FlushErrorState();
5954 :
5955 8 : RESUME_INTERRUPTS();
5956 :
5957 : /*
5958 : * Report the worker failed during sequence synchronization, table
5959 : * synchronization, or apply.
5960 : */
5961 8 : pgstat_report_subscription_error(MyLogicalRepWorker->subid,
5962 8 : MyLogicalRepWorker->type);
5963 :
5964 : /* Disable the subscription */
5965 8 : StartTransactionCommand();
5966 :
5967 : /*
5968 : * Updating pg_subscription might involve TOAST table access, so ensure we
5969 : * have a valid snapshot.
5970 : */
5971 8 : PushActiveSnapshot(GetTransactionSnapshot());
5972 :
5973 8 : DisableSubscription(MySubscription->oid);
5974 8 : PopActiveSnapshot();
5975 8 : CommitTransactionCommand();
5976 :
5977 : /* Ensure we remove no-longer-useful entry for worker's start time */
5978 8 : if (am_leader_apply_worker())
5979 6 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
5980 :
5981 : /* Notify the subscription has been disabled and exit */
5982 8 : ereport(LOG,
5983 : errmsg("subscription \"%s\" has been disabled because of an error",
5984 : MySubscription->name));
5985 :
5986 : /*
5987 : * Skip the track_commit_timestamp check when disabling the worker due to
5988 : * an error, as verifying commit timestamps is unnecessary in this
5989 : * context.
5990 : */
5991 8 : CheckSubDeadTupleRetention(false, true, WARNING,
5992 8 : MySubscription->retaindeadtuples,
5993 8 : MySubscription->retentionactive, false);
5994 :
5995 8 : proc_exit(0);
5996 : }
5997 :
5998 : /*
5999 : * Is current process a logical replication worker?
6000 : */
6001 : bool
6002 4038 : IsLogicalWorker(void)
6003 : {
6004 4038 : return MyLogicalRepWorker != NULL;
6005 : }
6006 :
6007 : /*
6008 : * Is current process a logical replication parallel apply worker?
6009 : */
6010 : bool
6011 2768 : IsLogicalParallelApplyWorker(void)
6012 : {
6013 2768 : return IsLogicalWorker() && am_parallel_apply_worker();
6014 : }
6015 :
6016 : /*
6017 : * Start skipping changes of the transaction if the given LSN matches the
6018 : * LSN specified by subscription's skiplsn.
6019 : */
6020 : static void
6021 1084 : maybe_start_skipping_changes(XLogRecPtr finish_lsn)
6022 : {
6023 : Assert(!is_skipping_changes());
6024 : Assert(!in_remote_transaction);
6025 : Assert(!in_streamed_transaction);
6026 :
6027 : /*
6028 : * Quick return if it's not requested to skip this transaction. This
6029 : * function is called for every remote transaction and we assume that
6030 : * skipping the transaction is not used often.
6031 : */
6032 1084 : if (likely(!XLogRecPtrIsValid(MySubscription->skiplsn) ||
6033 : MySubscription->skiplsn != finish_lsn))
6034 1078 : return;
6035 :
6036 : /* Start skipping all changes of this transaction */
6037 6 : skip_xact_finish_lsn = finish_lsn;
6038 :
6039 6 : ereport(LOG,
6040 : errmsg("logical replication starts skipping transaction at LSN %X/%08X",
6041 : LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
6042 : }
6043 :
6044 : /*
6045 : * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
6046 : */
6047 : static void
6048 54 : stop_skipping_changes(void)
6049 : {
6050 54 : if (!is_skipping_changes())
6051 48 : return;
6052 :
6053 6 : ereport(LOG,
6054 : errmsg("logical replication completed skipping transaction at LSN %X/%08X",
6055 : LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
6056 :
6057 : /* Stop skipping changes */
6058 6 : skip_xact_finish_lsn = InvalidXLogRecPtr;
6059 : }
6060 :
6061 : /*
6062 : * Clear subskiplsn of pg_subscription catalog.
6063 : *
6064 : * finish_lsn is the transaction's finish LSN that is used to check if the
6065 : * subskiplsn matches it. If not matched, we raise a warning when clearing the
6066 : * subskiplsn in order to inform users for cases e.g., where the user mistakenly
6067 : * specified the wrong subskiplsn.
6068 : */
6069 : static void
6070 1050 : clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
6071 : {
6072 : Relation rel;
6073 : Form_pg_subscription subform;
6074 : HeapTuple tup;
6075 1050 : XLogRecPtr myskiplsn = MySubscription->skiplsn;
6076 1050 : bool started_tx = false;
6077 :
6078 1050 : if (likely(!XLogRecPtrIsValid(myskiplsn)) || am_parallel_apply_worker())
6079 1044 : return;
6080 :
6081 6 : if (!IsTransactionState())
6082 : {
6083 2 : StartTransactionCommand();
6084 2 : started_tx = true;
6085 : }
6086 :
6087 : /*
6088 : * Updating pg_subscription might involve TOAST table access, so ensure we
6089 : * have a valid snapshot.
6090 : */
6091 6 : PushActiveSnapshot(GetTransactionSnapshot());
6092 :
6093 : /*
6094 : * Protect subskiplsn of pg_subscription from being concurrently updated
6095 : * while clearing it.
6096 : */
6097 6 : LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
6098 : AccessShareLock);
6099 :
6100 6 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
6101 :
6102 : /* Fetch the existing tuple. */
6103 6 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
6104 : ObjectIdGetDatum(MySubscription->oid));
6105 :
6106 6 : if (!HeapTupleIsValid(tup))
6107 0 : elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
6108 :
6109 6 : subform = (Form_pg_subscription) GETSTRUCT(tup);
6110 :
6111 : /*
6112 : * Clear the subskiplsn. If the user has already changed subskiplsn before
6113 : * clearing it we don't update the catalog and the replication origin
6114 : * state won't get advanced. So in the worst case, if the server crashes
6115 : * before sending an acknowledgment of the flush position the transaction
6116 : * will be sent again and the user needs to set subskiplsn again. We can
6117 : * reduce the possibility by logging a replication origin WAL record to
6118 : * advance the origin LSN instead but there is no way to advance the
6119 : * origin timestamp and it doesn't seem to be worth doing anything about
6120 : * it since it's a very rare case.
6121 : */
6122 6 : if (subform->subskiplsn == myskiplsn)
6123 : {
6124 : bool nulls[Natts_pg_subscription];
6125 : bool replaces[Natts_pg_subscription];
6126 : Datum values[Natts_pg_subscription];
6127 :
6128 6 : memset(values, 0, sizeof(values));
6129 6 : memset(nulls, false, sizeof(nulls));
6130 6 : memset(replaces, false, sizeof(replaces));
6131 :
6132 : /* reset subskiplsn */
6133 6 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
6134 6 : replaces[Anum_pg_subscription_subskiplsn - 1] = true;
6135 :
6136 6 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
6137 : replaces);
6138 6 : CatalogTupleUpdate(rel, &tup->t_self, tup);
6139 :
6140 6 : if (myskiplsn != finish_lsn)
6141 0 : ereport(WARNING,
6142 : errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
6143 : errdetail("Remote transaction's finish WAL location (LSN) %X/%08X did not match skip-LSN %X/%08X.",
6144 : LSN_FORMAT_ARGS(finish_lsn),
6145 : LSN_FORMAT_ARGS(myskiplsn)));
6146 : }
6147 :
6148 6 : heap_freetuple(tup);
6149 6 : table_close(rel, NoLock);
6150 :
6151 6 : PopActiveSnapshot();
6152 :
6153 6 : if (started_tx)
6154 2 : CommitTransactionCommand();
6155 : }
6156 :
6157 : /* Error callback to give more context info about the change being applied */
6158 : void
6159 16390 : apply_error_callback(void *arg)
6160 : {
6161 16390 : ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
6162 :
6163 16390 : if (apply_error_callback_arg.command == 0)
6164 14198 : return;
6165 :
6166 : Assert(errarg->origin_name);
6167 :
6168 2192 : if (errarg->rel == NULL)
6169 : {
6170 668 : if (!TransactionIdIsValid(errarg->remote_xid))
6171 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
6172 : errarg->origin_name,
6173 : logicalrep_message_type(errarg->command));
6174 668 : else if (!XLogRecPtrIsValid(errarg->finish_lsn))
6175 528 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
6176 : errarg->origin_name,
6177 : logicalrep_message_type(errarg->command),
6178 : errarg->remote_xid);
6179 : else
6180 280 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%08X",
6181 : errarg->origin_name,
6182 : logicalrep_message_type(errarg->command),
6183 : errarg->remote_xid,
6184 140 : LSN_FORMAT_ARGS(errarg->finish_lsn));
6185 : }
6186 : else
6187 : {
6188 1524 : if (errarg->remote_attnum < 0)
6189 : {
6190 1524 : if (!XLogRecPtrIsValid(errarg->finish_lsn))
6191 2720 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
6192 : errarg->origin_name,
6193 : logicalrep_message_type(errarg->command),
6194 1360 : errarg->rel->remoterel.nspname,
6195 1360 : errarg->rel->remoterel.relname,
6196 : errarg->remote_xid);
6197 : else
6198 328 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%08X",
6199 : errarg->origin_name,
6200 : logicalrep_message_type(errarg->command),
6201 164 : errarg->rel->remoterel.nspname,
6202 164 : errarg->rel->remoterel.relname,
6203 : errarg->remote_xid,
6204 164 : LSN_FORMAT_ARGS(errarg->finish_lsn));
6205 : }
6206 : else
6207 : {
6208 0 : if (!XLogRecPtrIsValid(errarg->finish_lsn))
6209 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
6210 : errarg->origin_name,
6211 : logicalrep_message_type(errarg->command),
6212 0 : errarg->rel->remoterel.nspname,
6213 0 : errarg->rel->remoterel.relname,
6214 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
6215 : errarg->remote_xid);
6216 : else
6217 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%08X",
6218 : errarg->origin_name,
6219 : logicalrep_message_type(errarg->command),
6220 0 : errarg->rel->remoterel.nspname,
6221 0 : errarg->rel->remoterel.relname,
6222 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
6223 : errarg->remote_xid,
6224 0 : LSN_FORMAT_ARGS(errarg->finish_lsn));
6225 : }
6226 : }
6227 : }
6228 :
6229 : /* Set transaction information of apply error callback */
6230 : static inline void
6231 5810 : set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
6232 : {
6233 5810 : apply_error_callback_arg.remote_xid = xid;
6234 5810 : apply_error_callback_arg.finish_lsn = lsn;
6235 5810 : }
6236 :
6237 : /* Reset all information of apply error callback */
6238 : static inline void
6239 2834 : reset_apply_error_context_info(void)
6240 : {
6241 2834 : apply_error_callback_arg.command = 0;
6242 2834 : apply_error_callback_arg.rel = NULL;
6243 2834 : apply_error_callback_arg.remote_attnum = -1;
6244 2834 : set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
6245 2834 : }
6246 :
6247 : /*
6248 : * Request wakeup of the workers for the given subscription OID
6249 : * at commit of the current transaction.
6250 : *
6251 : * This is used to ensure that the workers process assorted changes
6252 : * as soon as possible.
6253 : */
6254 : void
6255 448 : LogicalRepWorkersWakeupAtCommit(Oid subid)
6256 : {
6257 : MemoryContext oldcxt;
6258 :
6259 448 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
6260 448 : on_commit_wakeup_workers_subids =
6261 448 : list_append_unique_oid(on_commit_wakeup_workers_subids, subid);
6262 448 : MemoryContextSwitchTo(oldcxt);
6263 448 : }
6264 :
6265 : /*
6266 : * Wake up the workers of any subscriptions that were changed in this xact.
6267 : */
6268 : void
6269 1033538 : AtEOXact_LogicalRepWorkers(bool isCommit)
6270 : {
6271 1033538 : if (isCommit && on_commit_wakeup_workers_subids != NIL)
6272 : {
6273 : ListCell *lc;
6274 :
6275 438 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
6276 876 : foreach(lc, on_commit_wakeup_workers_subids)
6277 : {
6278 438 : Oid subid = lfirst_oid(lc);
6279 : List *workers;
6280 : ListCell *lc2;
6281 :
6282 438 : workers = logicalrep_workers_find(subid, true, false);
6283 576 : foreach(lc2, workers)
6284 : {
6285 138 : LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
6286 :
6287 138 : logicalrep_worker_wakeup_ptr(worker);
6288 : }
6289 : }
6290 438 : LWLockRelease(LogicalRepWorkerLock);
6291 : }
6292 :
6293 : /* The List storage will be reclaimed automatically in xact cleanup. */
6294 1033538 : on_commit_wakeup_workers_subids = NIL;
6295 1033538 : }
6296 :
6297 : /*
6298 : * Allocate the origin name in long-lived context for error context message.
6299 : */
6300 : void
6301 864 : set_apply_error_context_origin(char *originname)
6302 : {
6303 864 : apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
6304 : originname);
6305 864 : }
6306 :
6307 : /*
6308 : * Return the action to be taken for the given transaction. See
6309 : * TransApplyAction for information on each of the actions.
6310 : *
6311 : * *winfo is assigned to the destination parallel worker info when the leader
6312 : * apply worker has to pass all the transaction's changes to the parallel
6313 : * apply worker.
6314 : */
6315 : static TransApplyAction
6316 652764 : get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
6317 : {
6318 652764 : *winfo = NULL;
6319 :
6320 652764 : if (am_parallel_apply_worker())
6321 : {
6322 138052 : return TRANS_PARALLEL_APPLY;
6323 : }
6324 :
6325 : /*
6326 : * If we are processing this transaction using a parallel apply worker
6327 : * then either we send the changes to the parallel worker or if the worker
6328 : * is busy then serialize the changes to the file which will later be
6329 : * processed by the parallel worker.
6330 : */
6331 514712 : *winfo = pa_find_worker(xid);
6332 :
6333 514712 : if (*winfo && (*winfo)->serialize_changes)
6334 : {
6335 10074 : return TRANS_LEADER_PARTIAL_SERIALIZE;
6336 : }
6337 504638 : else if (*winfo)
6338 : {
6339 137792 : return TRANS_LEADER_SEND_TO_PARALLEL;
6340 : }
6341 :
6342 : /*
6343 : * If there is no parallel worker involved to process this transaction
6344 : * then we either directly apply the change or serialize it to a file
6345 : * which will later be applied when the transaction finish message is
6346 : * processed.
6347 : */
6348 366846 : else if (in_streamed_transaction)
6349 : {
6350 206394 : return TRANS_LEADER_SERIALIZE;
6351 : }
6352 : else
6353 : {
6354 160452 : return TRANS_LEADER_APPLY;
6355 : }
6356 : }
|