Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * worker.c
3 : * PostgreSQL logical replication worker (apply)
4 : *
5 : * Copyright (c) 2016-2025, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/worker.c
9 : *
10 : * NOTES
11 : * This file contains the worker which applies logical changes as they come
12 : * from remote logical replication stream.
13 : *
14 : * The main worker (apply) is started by logical replication worker
15 : * launcher for every enabled subscription in a database. It uses
16 : * walsender protocol to communicate with publisher.
17 : *
18 : * This module includes server facing code and shares libpqwalreceiver
19 : * module with walreceiver for providing the libpq specific functionality.
20 : *
21 : *
22 : * STREAMED TRANSACTIONS
23 : * ---------------------
24 : * Streamed transactions (large transactions exceeding a memory limit on the
25 : * upstream) are applied using one of two approaches:
26 : *
27 : * 1) Write to temporary files and apply when the final commit arrives
28 : *
29 : * This approach is used when the user has set the subscription's streaming
30 : * option as on.
31 : *
32 : * Unlike the regular (non-streamed) case, handling streamed transactions has
33 : * to handle aborts of both the toplevel transaction and subtransactions. This
34 : * is achieved by tracking offsets for subtransactions, which is then used
35 : * to truncate the file with serialized changes.
36 : *
37 : * The files are placed in tmp file directory by default, and the filenames
38 : * include both the XID of the toplevel transaction and OID of the
39 : * subscription. This is necessary so that different workers processing a
40 : * remote transaction with the same XID doesn't interfere.
41 : *
42 : * We use BufFiles instead of using normal temporary files because (a) the
43 : * BufFile infrastructure supports temporary files that exceed the OS file size
44 : * limit, (b) provides a way for automatic clean up on the error and (c) provides
45 : * a way to survive these files across local transactions and allow to open and
46 : * close at stream start and close. We decided to use FileSet
47 : * infrastructure as without that it deletes the files on the closure of the
48 : * file and if we decide to keep stream files open across the start/stop stream
49 : * then it will consume a lot of memory (more than 8K for each BufFile and
50 : * there could be multiple such BufFiles as the subscriber could receive
51 : * multiple start/stop streams for different transactions before getting the
52 : * commit). Moreover, if we don't use FileSet then we also need to invent
53 : * a new way to pass filenames to BufFile APIs so that we are allowed to open
54 : * the file we desired across multiple stream-open calls for the same
55 : * transaction.
56 : *
57 : * 2) Parallel apply workers.
58 : *
59 : * This approach is used when the user has set the subscription's streaming
60 : * option as parallel. See logical/applyparallelworker.c for information about
61 : * this approach.
62 : *
63 : * TWO_PHASE TRANSACTIONS
64 : * ----------------------
65 : * Two phase transactions are replayed at prepare and then committed or
66 : * rolled back at commit prepared and rollback prepared respectively. It is
67 : * possible to have a prepared transaction that arrives at the apply worker
68 : * when the tablesync is busy doing the initial copy. In this case, the apply
69 : * worker skips all the prepared operations [e.g. inserts] while the tablesync
70 : * is still busy (see the condition of should_apply_changes_for_rel). The
71 : * tablesync worker might not get such a prepared transaction because say it
72 : * was prior to the initial consistent point but might have got some later
73 : * commits. Now, the tablesync worker will exit without doing anything for the
74 : * prepared transaction skipped by the apply worker as the sync location for it
75 : * will be already ahead of the apply worker's current location. This would lead
76 : * to an "empty prepare", because later when the apply worker does the commit
77 : * prepare, there is nothing in it (the inserts were skipped earlier).
78 : *
79 : * To avoid this, and similar prepare confusions the subscription's two_phase
80 : * commit is enabled only after the initial sync is over. The two_phase option
81 : * has been implemented as a tri-state with values DISABLED, PENDING, and
82 : * ENABLED.
83 : *
84 : * Even if the user specifies they want a subscription with two_phase = on,
85 : * internally it will start with a tri-state of PENDING which only becomes
86 : * ENABLED after all tablesync initializations are completed - i.e. when all
87 : * tablesync workers have reached their READY state. In other words, the value
88 : * PENDING is only a temporary state for subscription start-up.
89 : *
90 : * Until the two_phase is properly available (ENABLED) the subscription will
91 : * behave as if two_phase = off. When the apply worker detects that all
92 : * tablesyncs have become READY (while the tri-state was PENDING) it will
93 : * restart the apply worker process. This happens in
94 : * process_syncing_tables_for_apply.
95 : *
96 : * When the (re-started) apply worker finds that all tablesyncs are READY for a
97 : * two_phase tri-state of PENDING it start streaming messages with the
98 : * two_phase option which in turn enables the decoding of two-phase commits at
99 : * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
100 : * Now, it is possible that during the time we have not enabled two_phase, the
101 : * publisher (replication server) would have skipped some prepares but we
102 : * ensure that such prepares are sent along with commit prepare, see
103 : * ReorderBufferFinishPrepared.
104 : *
105 : * If the subscription has no tables then a two_phase tri-state PENDING is
106 : * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
107 : * PUBLICATION which might otherwise be disallowed (see below).
108 : *
109 : * If ever a user needs to be aware of the tri-state value, they can fetch it
110 : * from the pg_subscription catalog (see column subtwophasestate).
111 : *
112 : * Finally, to avoid problems mentioned in previous paragraphs from any
113 : * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
114 : * to 'off' and then again back to 'on') there is a restriction for
115 : * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
116 : * the two_phase tri-state is ENABLED, except when copy_data = false.
117 : *
118 : * We can get prepare of the same GID more than once for the genuine cases
119 : * where we have defined multiple subscriptions for publications on the same
120 : * server and prepared transaction has operations on tables subscribed to those
121 : * subscriptions. For such cases, if we use the GID sent by publisher one of
122 : * the prepares will be successful and others will fail, in which case the
123 : * server will send them again. Now, this can lead to a deadlock if user has
124 : * set synchronous_standby_names for all the subscriptions on subscriber. To
125 : * avoid such deadlocks, we generate a unique GID (consisting of the
126 : * subscription oid and the xid of the prepared transaction) for each prepare
127 : * transaction on the subscriber.
128 : *
129 : * FAILOVER
130 : * ----------------------
131 : * The logical slot on the primary can be synced to the standby by specifying
132 : * failover = true when creating the subscription. Enabling failover allows us
133 : * to smoothly transition to the promoted standby, ensuring that we can
134 : * subscribe to the new primary without losing any data.
135 : *
136 : * RETAIN DEAD TUPLES
137 : * ----------------------
138 : * Each apply worker that enabled retain_dead_tuples option maintains a
139 : * non-removable transaction ID (oldest_nonremovable_xid) in shared memory to
140 : * prevent dead rows from being removed prematurely when the apply worker still
141 : * needs them to detect update_deleted conflicts. Additionally, this helps to
142 : * retain the required commit_ts module information, which further helps to
143 : * detect update_origin_differs and delete_origin_differs conflicts reliably, as
144 : * otherwise, vacuum freeze could remove the required information.
145 : *
146 : * The logical replication launcher manages an internal replication slot named
147 : * "pg_conflict_detection". It asynchronously aggregates the non-removable
148 : * transaction ID from all apply workers to determine the appropriate xmin for
149 : * the slot, thereby retaining necessary tuples.
150 : *
151 : * The non-removable transaction ID in the apply worker is advanced to the
152 : * oldest running transaction ID once all concurrent transactions on the
153 : * publisher have been applied and flushed locally. The process involves:
154 : *
155 : * - RDT_GET_CANDIDATE_XID:
156 : * Call GetOldestActiveTransactionId() to take oldestRunningXid as the
157 : * candidate xid.
158 : *
159 : * - RDT_REQUEST_PUBLISHER_STATUS:
160 : * Send a message to the walsender requesting the publisher status, which
161 : * includes the latest WAL write position and information about transactions
162 : * that are in the commit phase.
163 : *
164 : * - RDT_WAIT_FOR_PUBLISHER_STATUS:
165 : * Wait for the status from the walsender. After receiving the first status,
166 : * do not proceed if there are concurrent remote transactions that are still
167 : * in the commit phase. These transactions might have been assigned an
168 : * earlier commit timestamp but have not yet written the commit WAL record.
169 : * Continue to request the publisher status (RDT_REQUEST_PUBLISHER_STATUS)
170 : * until all these transactions have completed.
171 : *
172 : * - RDT_WAIT_FOR_LOCAL_FLUSH:
173 : * Advance the non-removable transaction ID if the current flush location has
174 : * reached or surpassed the last received WAL position.
175 : *
176 : * The overall state progression is: GET_CANDIDATE_XID ->
177 : * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
178 : * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
179 : * WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID.
180 : *
181 : * Retaining the dead tuples for this period is sufficient for ensuring
182 : * eventual consistency using last-update-wins strategy, as dead tuples are
183 : * useful for detecting conflicts only during the application of concurrent
184 : * transactions from remote nodes. After applying and flushing all remote
185 : * transactions that occurred concurrently with the tuple DELETE, any
186 : * subsequent UPDATE from a remote node should have a later timestamp. In such
187 : * cases, it is acceptable to detect an update_missing scenario and convert the
188 : * UPDATE to an INSERT when applying it. But, for concurrent remote
189 : * transactions with earlier timestamps than the DELETE, detecting
190 : * update_deleted is necessary, as the UPDATEs in remote transactions should be
191 : * ignored if their timestamp is earlier than that of the dead tuples.
192 : *
193 : * Note that advancing the non-removable transaction ID is not supported if the
194 : * publisher is also a physical standby. This is because the logical walsender
195 : * on the standby can only get the WAL replay position but there may be more
196 : * WALs that are being replicated from the primary and those WALs could have
197 : * earlier commit timestamp.
198 : *
199 : * Similarly, when the publisher has subscribed to another publisher,
200 : * information necessary for conflict detection cannot be retained for
201 : * changes from origins other than the publisher. This is because publisher
202 : * lacks the information on concurrent transactions of other publishers to
203 : * which it subscribes. As the information on concurrent transactions is
204 : * unavailable beyond subscriber's immediate publishers, the non-removable
205 : * transaction ID might be advanced prematurely before changes from other
206 : * origins have been fully applied.
207 : *
208 : * XXX Retaining information for changes from other origins might be possible
209 : * by requesting the subscription on that origin to enable retain_dead_tuples
210 : * and fetching the conflict detection slot.xmin along with the publisher's
211 : * status. In the RDT_WAIT_FOR_PUBLISHER_STATUS phase, the apply worker could
212 : * wait for the remote slot's xmin to reach the oldest active transaction ID,
213 : * ensuring that all transactions from other origins have been applied on the
214 : * publisher, thereby getting the latest WAL position that includes all
215 : * concurrent changes. However, this approach may impact performance, so it
216 : * might not worth the effort.
217 : *
218 : * XXX It seems feasible to get the latest commit's WAL location from the
219 : * publisher and wait till that is applied. However, we can't do that
220 : * because commit timestamps can regress as a commit with a later LSN is not
221 : * guaranteed to have a later timestamp than those with earlier LSNs. Having
222 : * said that, even if that is possible, it won't improve performance much as
223 : * the apply always lag and moves slowly as compared with the transactions
224 : * on the publisher.
225 : *-------------------------------------------------------------------------
226 : */
227 :
228 : #include "postgres.h"
229 :
230 : #include <sys/stat.h>
231 : #include <unistd.h>
232 :
233 : #include "access/commit_ts.h"
234 : #include "access/table.h"
235 : #include "access/tableam.h"
236 : #include "access/twophase.h"
237 : #include "access/xact.h"
238 : #include "catalog/indexing.h"
239 : #include "catalog/pg_inherits.h"
240 : #include "catalog/pg_subscription.h"
241 : #include "catalog/pg_subscription_rel.h"
242 : #include "commands/subscriptioncmds.h"
243 : #include "commands/tablecmds.h"
244 : #include "commands/trigger.h"
245 : #include "executor/executor.h"
246 : #include "executor/execPartition.h"
247 : #include "libpq/pqformat.h"
248 : #include "miscadmin.h"
249 : #include "optimizer/optimizer.h"
250 : #include "parser/parse_relation.h"
251 : #include "pgstat.h"
252 : #include "postmaster/bgworker.h"
253 : #include "postmaster/interrupt.h"
254 : #include "postmaster/walwriter.h"
255 : #include "replication/conflict.h"
256 : #include "replication/logicallauncher.h"
257 : #include "replication/logicalproto.h"
258 : #include "replication/logicalrelation.h"
259 : #include "replication/logicalworker.h"
260 : #include "replication/origin.h"
261 : #include "replication/slot.h"
262 : #include "replication/walreceiver.h"
263 : #include "replication/worker_internal.h"
264 : #include "rewrite/rewriteHandler.h"
265 : #include "storage/buffile.h"
266 : #include "storage/ipc.h"
267 : #include "storage/lmgr.h"
268 : #include "storage/procarray.h"
269 : #include "tcop/tcopprot.h"
270 : #include "utils/acl.h"
271 : #include "utils/dynahash.h"
272 : #include "utils/guc.h"
273 : #include "utils/inval.h"
274 : #include "utils/lsyscache.h"
275 : #include "utils/memutils.h"
276 : #include "utils/pg_lsn.h"
277 : #include "utils/rel.h"
278 : #include "utils/rls.h"
279 : #include "utils/snapmgr.h"
280 : #include "utils/syscache.h"
281 : #include "utils/usercontext.h"
282 :
283 : #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
284 :
285 : typedef struct FlushPosition
286 : {
287 : dlist_node node;
288 : XLogRecPtr local_end;
289 : XLogRecPtr remote_end;
290 : } FlushPosition;
291 :
292 : static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
293 :
294 : typedef struct ApplyExecutionData
295 : {
296 : EState *estate; /* executor state, used to track resources */
297 :
298 : LogicalRepRelMapEntry *targetRel; /* replication target rel */
299 : ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
300 :
301 : /* These fields are used when the target relation is partitioned: */
302 : ModifyTableState *mtstate; /* dummy ModifyTable state */
303 : PartitionTupleRouting *proute; /* partition routing info */
304 : } ApplyExecutionData;
305 :
306 : /* Struct for saving and restoring apply errcontext information */
307 : typedef struct ApplyErrorCallbackArg
308 : {
309 : LogicalRepMsgType command; /* 0 if invalid */
310 : LogicalRepRelMapEntry *rel;
311 :
312 : /* Remote node information */
313 : int remote_attnum; /* -1 if invalid */
314 : TransactionId remote_xid;
315 : XLogRecPtr finish_lsn;
316 : char *origin_name;
317 : } ApplyErrorCallbackArg;
318 :
319 : /*
320 : * The action to be taken for the changes in the transaction.
321 : *
322 : * TRANS_LEADER_APPLY:
323 : * This action means that we are in the leader apply worker or table sync
324 : * worker. The changes of the transaction are either directly applied or
325 : * are read from temporary files (for streaming transactions) and then
326 : * applied by the worker.
327 : *
328 : * TRANS_LEADER_SERIALIZE:
329 : * This action means that we are in the leader apply worker or table sync
330 : * worker. Changes are written to temporary files and then applied when the
331 : * final commit arrives.
332 : *
333 : * TRANS_LEADER_SEND_TO_PARALLEL:
334 : * This action means that we are in the leader apply worker and need to send
335 : * the changes to the parallel apply worker.
336 : *
337 : * TRANS_LEADER_PARTIAL_SERIALIZE:
338 : * This action means that we are in the leader apply worker and have sent some
339 : * changes directly to the parallel apply worker and the remaining changes are
340 : * serialized to a file, due to timeout while sending data. The parallel apply
341 : * worker will apply these serialized changes when the final commit arrives.
342 : *
343 : * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
344 : * serializing changes, the leader worker also needs to serialize the
345 : * STREAM_XXX message to a file, and wait for the parallel apply worker to
346 : * finish the transaction when processing the transaction finish command. So
347 : * this new action was introduced to keep the code and logic clear.
348 : *
349 : * TRANS_PARALLEL_APPLY:
350 : * This action means that we are in the parallel apply worker and changes of
351 : * the transaction are applied directly by the worker.
352 : */
353 : typedef enum
354 : {
355 : /* The action for non-streaming transactions. */
356 : TRANS_LEADER_APPLY,
357 :
358 : /* Actions for streaming transactions. */
359 : TRANS_LEADER_SERIALIZE,
360 : TRANS_LEADER_SEND_TO_PARALLEL,
361 : TRANS_LEADER_PARTIAL_SERIALIZE,
362 : TRANS_PARALLEL_APPLY,
363 : } TransApplyAction;
364 :
365 : /*
366 : * The phases involved in advancing the non-removable transaction ID.
367 : *
368 : * See comments atop worker.c for details of the transition between these
369 : * phases.
370 : */
371 : typedef enum
372 : {
373 : RDT_GET_CANDIDATE_XID,
374 : RDT_REQUEST_PUBLISHER_STATUS,
375 : RDT_WAIT_FOR_PUBLISHER_STATUS,
376 : RDT_WAIT_FOR_LOCAL_FLUSH
377 : } RetainDeadTuplesPhase;
378 :
379 : /*
380 : * Critical information for managing phase transitions within the
381 : * RetainDeadTuplesPhase.
382 : */
383 : typedef struct RetainDeadTuplesData
384 : {
385 : RetainDeadTuplesPhase phase; /* current phase */
386 : XLogRecPtr remote_lsn; /* WAL write position on the publisher */
387 :
388 : /*
389 : * Oldest transaction ID that was in the commit phase on the publisher.
390 : * Use FullTransactionId to prevent issues with transaction ID wraparound,
391 : * where a new remote_oldestxid could falsely appear to originate from the
392 : * past and block advancement.
393 : */
394 : FullTransactionId remote_oldestxid;
395 :
396 : /*
397 : * Next transaction ID to be assigned on the publisher. Use
398 : * FullTransactionId for consistency and to allow straightforward
399 : * comparisons with remote_oldestxid.
400 : */
401 : FullTransactionId remote_nextxid;
402 :
403 : TimestampTz reply_time; /* when the publisher responds with status */
404 :
405 : /*
406 : * Publisher transaction ID that must be awaited to complete before
407 : * entering the final phase (RDT_WAIT_FOR_LOCAL_FLUSH). Use
408 : * FullTransactionId for the same reason as remote_nextxid.
409 : */
410 : FullTransactionId remote_wait_for;
411 :
412 : TransactionId candidate_xid; /* candidate for the non-removable
413 : * transaction ID */
414 : TimestampTz flushpos_update_time; /* when the remote flush position was
415 : * updated in final phase
416 : * (RDT_WAIT_FOR_LOCAL_FLUSH) */
417 :
418 : /*
419 : * The following fields are used to determine the timing for the next
420 : * round of transaction ID advancement.
421 : */
422 : TimestampTz last_recv_time; /* when the last message was received */
423 : TimestampTz candidate_xid_time; /* when the candidate_xid is decided */
424 : int xid_advance_interval; /* how much time (ms) to wait before
425 : * attempting to advance the
426 : * non-removable transaction ID */
427 : } RetainDeadTuplesData;
428 :
429 : /*
430 : * The minimum (100ms) and maximum (3 minutes) intervals for advancing
431 : * non-removable transaction IDs. The maximum interval is a bit arbitrary but
432 : * is sufficient to not cause any undue network traffic.
433 : */
434 : #define MIN_XID_ADVANCE_INTERVAL 100
435 : #define MAX_XID_ADVANCE_INTERVAL 180000
436 :
437 : /* errcontext tracker */
438 : static ApplyErrorCallbackArg apply_error_callback_arg =
439 : {
440 : .command = 0,
441 : .rel = NULL,
442 : .remote_attnum = -1,
443 : .remote_xid = InvalidTransactionId,
444 : .finish_lsn = InvalidXLogRecPtr,
445 : .origin_name = NULL,
446 : };
447 :
448 : ErrorContextCallback *apply_error_context_stack = NULL;
449 :
450 : MemoryContext ApplyMessageContext = NULL;
451 : MemoryContext ApplyContext = NULL;
452 :
453 : /* per stream context for streaming transactions */
454 : static MemoryContext LogicalStreamingContext = NULL;
455 :
456 : WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
457 :
458 : Subscription *MySubscription = NULL;
459 : static bool MySubscriptionValid = false;
460 :
461 : static List *on_commit_wakeup_workers_subids = NIL;
462 :
463 : bool in_remote_transaction = false;
464 : static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
465 :
466 : /* fields valid only when processing streamed transaction */
467 : static bool in_streamed_transaction = false;
468 :
469 : static TransactionId stream_xid = InvalidTransactionId;
470 :
471 : /*
472 : * The number of changes applied by parallel apply worker during one streaming
473 : * block.
474 : */
475 : static uint32 parallel_stream_nchanges = 0;
476 :
477 : /* Are we initializing an apply worker? */
478 : bool InitializingApplyWorker = false;
479 :
480 : /*
481 : * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
482 : * the subscription if the remote transaction's finish LSN matches the subskiplsn.
483 : * Once we start skipping changes, we don't stop it until we skip all changes of
484 : * the transaction even if pg_subscription is updated and MySubscription->skiplsn
485 : * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
486 : * we don't skip receiving and spooling the changes since we decide whether or not
487 : * to skip applying the changes when starting to apply changes. The subskiplsn is
488 : * cleared after successfully skipping the transaction or applying non-empty
489 : * transaction. The latter prevents the mistakenly specified subskiplsn from
490 : * being left. Note that we cannot skip the streaming transactions when using
491 : * parallel apply workers because we cannot get the finish LSN before applying
492 : * the changes. So, we don't start parallel apply worker when finish LSN is set
493 : * by the user.
494 : */
495 : static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
496 : #define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
497 :
498 : /* BufFile handle of the current streaming file */
499 : static BufFile *stream_fd = NULL;
500 :
501 : /*
502 : * The remote WAL position that has been applied and flushed locally. We record
503 : * and use this information both while sending feedback to the server and
504 : * advancing oldest_nonremovable_xid.
505 : */
506 : static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
507 :
508 : typedef struct SubXactInfo
509 : {
510 : TransactionId xid; /* XID of the subxact */
511 : int fileno; /* file number in the buffile */
512 : off_t offset; /* offset in the file */
513 : } SubXactInfo;
514 :
515 : /* Sub-transaction data for the current streaming transaction */
516 : typedef struct ApplySubXactData
517 : {
518 : uint32 nsubxacts; /* number of sub-transactions */
519 : uint32 nsubxacts_max; /* current capacity of subxacts */
520 : TransactionId subxact_last; /* xid of the last sub-transaction */
521 : SubXactInfo *subxacts; /* sub-xact offset in changes file */
522 : } ApplySubXactData;
523 :
524 : static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
525 :
526 : static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
527 : static inline void changes_filename(char *path, Oid subid, TransactionId xid);
528 :
529 : /*
530 : * Information about subtransactions of a given toplevel transaction.
531 : */
532 : static void subxact_info_write(Oid subid, TransactionId xid);
533 : static void subxact_info_read(Oid subid, TransactionId xid);
534 : static void subxact_info_add(TransactionId xid);
535 : static inline void cleanup_subxact_info(void);
536 :
537 : /*
538 : * Serialize and deserialize changes for a toplevel transaction.
539 : */
540 : static void stream_open_file(Oid subid, TransactionId xid,
541 : bool first_segment);
542 : static void stream_write_change(char action, StringInfo s);
543 : static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
544 : static void stream_close_file(void);
545 :
546 : static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
547 :
548 : static void maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
549 : bool status_received);
550 : static bool can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data);
551 : static void process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
552 : bool status_received);
553 : static void get_candidate_xid(RetainDeadTuplesData *rdt_data);
554 : static void request_publisher_status(RetainDeadTuplesData *rdt_data);
555 : static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
556 : bool status_received);
557 : static void wait_for_local_flush(RetainDeadTuplesData *rdt_data);
558 : static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data,
559 : bool new_xid_found);
560 :
561 : static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
562 : static void apply_handle_insert_internal(ApplyExecutionData *edata,
563 : ResultRelInfo *relinfo,
564 : TupleTableSlot *remoteslot);
565 : static void apply_handle_update_internal(ApplyExecutionData *edata,
566 : ResultRelInfo *relinfo,
567 : TupleTableSlot *remoteslot,
568 : LogicalRepTupleData *newtup,
569 : Oid localindexoid);
570 : static void apply_handle_delete_internal(ApplyExecutionData *edata,
571 : ResultRelInfo *relinfo,
572 : TupleTableSlot *remoteslot,
573 : Oid localindexoid);
574 : static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
575 : LogicalRepRelation *remoterel,
576 : Oid localidxoid,
577 : TupleTableSlot *remoteslot,
578 : TupleTableSlot **localslot);
579 : static bool FindDeletedTupleInLocalRel(Relation localrel,
580 : Oid localidxoid,
581 : TupleTableSlot *remoteslot,
582 : TransactionId *delete_xid,
583 : RepOriginId *delete_origin,
584 : TimestampTz *delete_time);
585 : static void apply_handle_tuple_routing(ApplyExecutionData *edata,
586 : TupleTableSlot *remoteslot,
587 : LogicalRepTupleData *newtup,
588 : CmdType operation);
589 :
590 : /* Functions for skipping changes */
591 : static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
592 : static void stop_skipping_changes(void);
593 : static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
594 :
595 : /* Functions for apply error callback */
596 : static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
597 : static inline void reset_apply_error_context_info(void);
598 :
599 : static TransApplyAction get_transaction_apply_action(TransactionId xid,
600 : ParallelApplyWorkerInfo **winfo);
601 :
602 : static void replorigin_reset(int code, Datum arg);
603 :
604 : /*
605 : * Form the origin name for the subscription.
606 : *
607 : * This is a common function for tablesync and other workers. Tablesync workers
608 : * must pass a valid relid. Other callers must pass relid = InvalidOid.
609 : *
610 : * Return the name in the supplied buffer.
611 : */
612 : void
613 2558 : ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
614 : char *originname, Size szoriginname)
615 : {
616 2558 : if (OidIsValid(relid))
617 : {
618 : /* Replication origin name for tablesync workers. */
619 1490 : snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
620 : }
621 : else
622 : {
623 : /* Replication origin name for non-tablesync workers. */
624 1068 : snprintf(originname, szoriginname, "pg_%u", suboid);
625 : }
626 2558 : }
627 :
628 : /*
629 : * Should this worker apply changes for given relation.
630 : *
631 : * This is mainly needed for initial relation data sync as that runs in
632 : * separate worker process running in parallel and we need some way to skip
633 : * changes coming to the leader apply worker during the sync of a table.
634 : *
635 : * Note we need to do smaller or equals comparison for SYNCDONE state because
636 : * it might hold position of end of initial slot consistent point WAL
637 : * record + 1 (ie start of next record) and next record can be COMMIT of
638 : * transaction we are now processing (which is what we set remote_final_lsn
639 : * to in apply_handle_begin).
640 : *
641 : * Note that for streaming transactions that are being applied in the parallel
642 : * apply worker, we disallow applying changes if the target table in the
643 : * subscription is not in the READY state, because we cannot decide whether to
644 : * apply the change as we won't know remote_final_lsn by that time.
645 : *
646 : * We already checked this in pa_can_start() before assigning the
647 : * streaming transaction to the parallel worker, but it also needs to be
648 : * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
649 : * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
650 : * while applying this transaction.
651 : */
652 : static bool
653 296220 : should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
654 : {
655 296220 : switch (MyLogicalRepWorker->type)
656 : {
657 0 : case WORKERTYPE_TABLESYNC:
658 0 : return MyLogicalRepWorker->relid == rel->localreloid;
659 :
660 136730 : case WORKERTYPE_PARALLEL_APPLY:
661 : /* We don't synchronize rel's that are in unknown state. */
662 136730 : if (rel->state != SUBREL_STATE_READY &&
663 0 : rel->state != SUBREL_STATE_UNKNOWN)
664 0 : ereport(ERROR,
665 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
666 : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
667 : MySubscription->name),
668 : errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
669 :
670 136730 : return rel->state == SUBREL_STATE_READY;
671 :
672 159490 : case WORKERTYPE_APPLY:
673 159618 : return (rel->state == SUBREL_STATE_READY ||
674 128 : (rel->state == SUBREL_STATE_SYNCDONE &&
675 28 : rel->statelsn <= remote_final_lsn));
676 :
677 0 : case WORKERTYPE_UNKNOWN:
678 : /* Should never happen. */
679 0 : elog(ERROR, "Unknown worker type");
680 : }
681 :
682 0 : return false; /* dummy for compiler */
683 : }
684 :
685 : /*
686 : * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
687 : *
688 : * Start a transaction, if this is the first step (else we keep using the
689 : * existing transaction).
690 : * Also provide a global snapshot and ensure we run in ApplyMessageContext.
691 : */
692 : static void
693 297132 : begin_replication_step(void)
694 : {
695 297132 : SetCurrentStatementStartTimestamp();
696 :
697 297132 : if (!IsTransactionState())
698 : {
699 1884 : StartTransactionCommand();
700 1884 : maybe_reread_subscription();
701 : }
702 :
703 297126 : PushActiveSnapshot(GetTransactionSnapshot());
704 :
705 297126 : MemoryContextSwitchTo(ApplyMessageContext);
706 297126 : }
707 :
708 : /*
709 : * Finish up one step of a replication transaction.
710 : * Callers of begin_replication_step() must also call this.
711 : *
712 : * We don't close out the transaction here, but we should increment
713 : * the command counter to make the effects of this step visible.
714 : */
715 : static void
716 297050 : end_replication_step(void)
717 : {
718 297050 : PopActiveSnapshot();
719 :
720 297050 : CommandCounterIncrement();
721 297050 : }
722 :
723 : /*
724 : * Handle streamed transactions for both the leader apply worker and the
725 : * parallel apply workers.
726 : *
727 : * In the streaming case (receiving a block of the streamed transaction), for
728 : * serialize mode, simply redirect it to a file for the proper toplevel
729 : * transaction, and for parallel mode, the leader apply worker will send the
730 : * changes to parallel apply workers and the parallel apply worker will define
731 : * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
732 : * messages will be applied by both leader apply worker and parallel apply
733 : * workers).
734 : *
735 : * Returns true for streamed transactions (when the change is either serialized
736 : * to file or sent to parallel apply worker), false otherwise (regular mode or
737 : * needs to be processed by parallel apply worker).
738 : *
739 : * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
740 : * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
741 : * to a parallel apply worker.
742 : */
743 : static bool
744 648842 : handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
745 : {
746 : TransactionId current_xid;
747 : ParallelApplyWorkerInfo *winfo;
748 : TransApplyAction apply_action;
749 : StringInfoData original_msg;
750 :
751 648842 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
752 :
753 : /* not in streaming mode */
754 648842 : if (apply_action == TRANS_LEADER_APPLY)
755 160246 : return false;
756 :
757 : Assert(TransactionIdIsValid(stream_xid));
758 :
759 : /*
760 : * The parallel apply worker needs the xid in this message to decide
761 : * whether to define a savepoint, so save the original message that has
762 : * not moved the cursor after the xid. We will serialize this message to a
763 : * file in PARTIAL_SERIALIZE mode.
764 : */
765 488596 : original_msg = *s;
766 :
767 : /*
768 : * We should have received XID of the subxact as the first part of the
769 : * message, so extract it.
770 : */
771 488596 : current_xid = pq_getmsgint(s, 4);
772 :
773 488596 : if (!TransactionIdIsValid(current_xid))
774 0 : ereport(ERROR,
775 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
776 : errmsg_internal("invalid transaction ID in streamed replication transaction")));
777 :
778 488596 : switch (apply_action)
779 : {
780 205026 : case TRANS_LEADER_SERIALIZE:
781 : Assert(stream_fd);
782 :
783 : /* Add the new subxact to the array (unless already there). */
784 205026 : subxact_info_add(current_xid);
785 :
786 : /* Write the change to the current file */
787 205026 : stream_write_change(action, s);
788 205026 : return true;
789 :
790 136772 : case TRANS_LEADER_SEND_TO_PARALLEL:
791 : Assert(winfo);
792 :
793 : /*
794 : * XXX The publisher side doesn't always send relation/type update
795 : * messages after the streaming transaction, so also update the
796 : * relation/type in leader apply worker. See function
797 : * cleanup_rel_sync_cache.
798 : */
799 136772 : if (pa_send_data(winfo, s->len, s->data))
800 136772 : return (action != LOGICAL_REP_MSG_RELATION &&
801 : action != LOGICAL_REP_MSG_TYPE);
802 :
803 : /*
804 : * Switch to serialize mode when we are not able to send the
805 : * change to parallel apply worker.
806 : */
807 0 : pa_switch_to_partial_serialize(winfo, false);
808 :
809 : /* fall through */
810 10012 : case TRANS_LEADER_PARTIAL_SERIALIZE:
811 10012 : stream_write_change(action, &original_msg);
812 :
813 : /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
814 10012 : return (action != LOGICAL_REP_MSG_RELATION &&
815 : action != LOGICAL_REP_MSG_TYPE);
816 :
817 136786 : case TRANS_PARALLEL_APPLY:
818 136786 : parallel_stream_nchanges += 1;
819 :
820 : /* Define a savepoint for a subxact if needed. */
821 136786 : pa_start_subtrans(current_xid, stream_xid);
822 136786 : return false;
823 :
824 0 : default:
825 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
826 : return false; /* silence compiler warning */
827 : }
828 : }
829 :
830 : /*
831 : * Executor state preparation for evaluation of constraint expressions,
832 : * indexes and triggers for the specified relation.
833 : *
834 : * Note that the caller must open and close any indexes to be updated.
835 : */
836 : static ApplyExecutionData *
837 296062 : create_edata_for_relation(LogicalRepRelMapEntry *rel)
838 : {
839 : ApplyExecutionData *edata;
840 : EState *estate;
841 : RangeTblEntry *rte;
842 296062 : List *perminfos = NIL;
843 : ResultRelInfo *resultRelInfo;
844 :
845 296062 : edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
846 296062 : edata->targetRel = rel;
847 :
848 296062 : edata->estate = estate = CreateExecutorState();
849 :
850 296062 : rte = makeNode(RangeTblEntry);
851 296062 : rte->rtekind = RTE_RELATION;
852 296062 : rte->relid = RelationGetRelid(rel->localrel);
853 296062 : rte->relkind = rel->localrel->rd_rel->relkind;
854 296062 : rte->rellockmode = AccessShareLock;
855 :
856 296062 : addRTEPermissionInfo(&perminfos, rte);
857 :
858 296062 : ExecInitRangeTable(estate, list_make1(rte), perminfos,
859 : bms_make_singleton(1));
860 :
861 296062 : edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
862 :
863 : /*
864 : * Use Relation opened by logicalrep_rel_open() instead of opening it
865 : * again.
866 : */
867 296062 : InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
868 :
869 : /*
870 : * We put the ResultRelInfo in the es_opened_result_relations list, even
871 : * though we don't populate the es_result_relations array. That's a bit
872 : * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
873 : *
874 : * ExecOpenIndices() is not called here either, each execution path doing
875 : * an apply operation being responsible for that.
876 : */
877 296062 : estate->es_opened_result_relations =
878 296062 : lappend(estate->es_opened_result_relations, resultRelInfo);
879 :
880 296062 : estate->es_output_cid = GetCurrentCommandId(true);
881 :
882 : /* Prepare to catch AFTER triggers. */
883 296062 : AfterTriggerBeginQuery();
884 :
885 : /* other fields of edata remain NULL for now */
886 :
887 296062 : return edata;
888 : }
889 :
890 : /*
891 : * Finish any operations related to the executor state created by
892 : * create_edata_for_relation().
893 : */
894 : static void
895 296004 : finish_edata(ApplyExecutionData *edata)
896 : {
897 296004 : EState *estate = edata->estate;
898 :
899 : /* Handle any queued AFTER triggers. */
900 296004 : AfterTriggerEndQuery(estate);
901 :
902 : /* Shut down tuple routing, if any was done. */
903 296004 : if (edata->proute)
904 148 : ExecCleanupTupleRouting(edata->mtstate, edata->proute);
905 :
906 : /*
907 : * Cleanup. It might seem that we should call ExecCloseResultRelations()
908 : * here, but we intentionally don't. It would close the rel we added to
909 : * es_opened_result_relations above, which is wrong because we took no
910 : * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
911 : * any other relations opened during execution.
912 : */
913 296004 : ExecResetTupleTable(estate->es_tupleTable, false);
914 296004 : FreeExecutorState(estate);
915 296004 : pfree(edata);
916 296004 : }
917 :
918 : /*
919 : * Executes default values for columns for which we can't map to remote
920 : * relation columns.
921 : *
922 : * This allows us to support tables which have more columns on the downstream
923 : * than on the upstream.
924 : */
925 : static void
926 151550 : slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
927 : TupleTableSlot *slot)
928 : {
929 151550 : TupleDesc desc = RelationGetDescr(rel->localrel);
930 151550 : int num_phys_attrs = desc->natts;
931 : int i;
932 : int attnum,
933 151550 : num_defaults = 0;
934 : int *defmap;
935 : ExprState **defexprs;
936 : ExprContext *econtext;
937 :
938 151550 : econtext = GetPerTupleExprContext(estate);
939 :
940 : /* We got all the data via replication, no need to evaluate anything. */
941 151550 : if (num_phys_attrs == rel->remoterel.natts)
942 71260 : return;
943 :
944 80290 : defmap = (int *) palloc(num_phys_attrs * sizeof(int));
945 80290 : defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
946 :
947 : Assert(rel->attrmap->maplen == num_phys_attrs);
948 421326 : for (attnum = 0; attnum < num_phys_attrs; attnum++)
949 : {
950 : Expr *defexpr;
951 :
952 341036 : if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
953 18 : continue;
954 :
955 341018 : if (rel->attrmap->attnums[attnum] >= 0)
956 184536 : continue;
957 :
958 156482 : defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
959 :
960 156482 : if (defexpr != NULL)
961 : {
962 : /* Run the expression through planner */
963 140262 : defexpr = expression_planner(defexpr);
964 :
965 : /* Initialize executable expression in copycontext */
966 140262 : defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
967 140262 : defmap[num_defaults] = attnum;
968 140262 : num_defaults++;
969 : }
970 : }
971 :
972 220552 : for (i = 0; i < num_defaults; i++)
973 140262 : slot->tts_values[defmap[i]] =
974 140262 : ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
975 : }
976 :
977 : /*
978 : * Store tuple data into slot.
979 : *
980 : * Incoming data can be either text or binary format.
981 : */
982 : static void
983 296084 : slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
984 : LogicalRepTupleData *tupleData)
985 : {
986 296084 : int natts = slot->tts_tupleDescriptor->natts;
987 : int i;
988 :
989 296084 : ExecClearTuple(slot);
990 :
991 : /* Call the "in" function for each non-dropped, non-null attribute */
992 : Assert(natts == rel->attrmap->maplen);
993 1315150 : for (i = 0; i < natts; i++)
994 : {
995 1019066 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
996 1019066 : int remoteattnum = rel->attrmap->attnums[i];
997 :
998 1019066 : if (!att->attisdropped && remoteattnum >= 0)
999 605232 : {
1000 605232 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
1001 :
1002 : Assert(remoteattnum < tupleData->ncols);
1003 :
1004 : /* Set attnum for error callback */
1005 605232 : apply_error_callback_arg.remote_attnum = remoteattnum;
1006 :
1007 605232 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
1008 : {
1009 : Oid typinput;
1010 : Oid typioparam;
1011 :
1012 284616 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1013 569232 : slot->tts_values[i] =
1014 284616 : OidInputFunctionCall(typinput, colvalue->data,
1015 : typioparam, att->atttypmod);
1016 284616 : slot->tts_isnull[i] = false;
1017 : }
1018 320616 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1019 : {
1020 : Oid typreceive;
1021 : Oid typioparam;
1022 :
1023 : /*
1024 : * In some code paths we may be asked to re-parse the same
1025 : * tuple data. Reset the StringInfo's cursor so that works.
1026 : */
1027 219960 : colvalue->cursor = 0;
1028 :
1029 219960 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1030 439920 : slot->tts_values[i] =
1031 219960 : OidReceiveFunctionCall(typreceive, colvalue,
1032 : typioparam, att->atttypmod);
1033 :
1034 : /* Trouble if it didn't eat the whole buffer */
1035 219960 : if (colvalue->cursor != colvalue->len)
1036 0 : ereport(ERROR,
1037 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
1038 : errmsg("incorrect binary data format in logical replication column %d",
1039 : remoteattnum + 1)));
1040 219960 : slot->tts_isnull[i] = false;
1041 : }
1042 : else
1043 : {
1044 : /*
1045 : * NULL value from remote. (We don't expect to see
1046 : * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
1047 : * NULL.)
1048 : */
1049 100656 : slot->tts_values[i] = (Datum) 0;
1050 100656 : slot->tts_isnull[i] = true;
1051 : }
1052 :
1053 : /* Reset attnum for error callback */
1054 605232 : apply_error_callback_arg.remote_attnum = -1;
1055 : }
1056 : else
1057 : {
1058 : /*
1059 : * We assign NULL to dropped attributes and missing values
1060 : * (missing values should be later filled using
1061 : * slot_fill_defaults).
1062 : */
1063 413834 : slot->tts_values[i] = (Datum) 0;
1064 413834 : slot->tts_isnull[i] = true;
1065 : }
1066 : }
1067 :
1068 296084 : ExecStoreVirtualTuple(slot);
1069 296084 : }
1070 :
1071 : /*
1072 : * Replace updated columns with data from the LogicalRepTupleData struct.
1073 : * This is somewhat similar to heap_modify_tuple but also calls the type
1074 : * input functions on the user data.
1075 : *
1076 : * "slot" is filled with a copy of the tuple in "srcslot", replacing
1077 : * columns provided in "tupleData" and leaving others as-is.
1078 : *
1079 : * Caution: unreplaced pass-by-ref columns in "slot" will point into the
1080 : * storage for "srcslot". This is OK for current usage, but someday we may
1081 : * need to materialize "slot" at the end to make it independent of "srcslot".
1082 : */
1083 : static void
1084 63848 : slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
1085 : LogicalRepRelMapEntry *rel,
1086 : LogicalRepTupleData *tupleData)
1087 : {
1088 63848 : int natts = slot->tts_tupleDescriptor->natts;
1089 : int i;
1090 :
1091 : /* We'll fill "slot" with a virtual tuple, so we must start with ... */
1092 63848 : ExecClearTuple(slot);
1093 :
1094 : /*
1095 : * Copy all the column data from srcslot, so that we'll have valid values
1096 : * for unreplaced columns.
1097 : */
1098 : Assert(natts == srcslot->tts_tupleDescriptor->natts);
1099 63848 : slot_getallattrs(srcslot);
1100 63848 : memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
1101 63848 : memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
1102 :
1103 : /* Call the "in" function for each replaced attribute */
1104 : Assert(natts == rel->attrmap->maplen);
1105 318560 : for (i = 0; i < natts; i++)
1106 : {
1107 254712 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
1108 254712 : int remoteattnum = rel->attrmap->attnums[i];
1109 :
1110 254712 : if (remoteattnum < 0)
1111 117038 : continue;
1112 :
1113 : Assert(remoteattnum < tupleData->ncols);
1114 :
1115 137674 : if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
1116 : {
1117 137668 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
1118 :
1119 : /* Set attnum for error callback */
1120 137668 : apply_error_callback_arg.remote_attnum = remoteattnum;
1121 :
1122 137668 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
1123 : {
1124 : Oid typinput;
1125 : Oid typioparam;
1126 :
1127 50860 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1128 101720 : slot->tts_values[i] =
1129 50860 : OidInputFunctionCall(typinput, colvalue->data,
1130 : typioparam, att->atttypmod);
1131 50860 : slot->tts_isnull[i] = false;
1132 : }
1133 86808 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1134 : {
1135 : Oid typreceive;
1136 : Oid typioparam;
1137 :
1138 : /*
1139 : * In some code paths we may be asked to re-parse the same
1140 : * tuple data. Reset the StringInfo's cursor so that works.
1141 : */
1142 86712 : colvalue->cursor = 0;
1143 :
1144 86712 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1145 173424 : slot->tts_values[i] =
1146 86712 : OidReceiveFunctionCall(typreceive, colvalue,
1147 : typioparam, att->atttypmod);
1148 :
1149 : /* Trouble if it didn't eat the whole buffer */
1150 86712 : if (colvalue->cursor != colvalue->len)
1151 0 : ereport(ERROR,
1152 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
1153 : errmsg("incorrect binary data format in logical replication column %d",
1154 : remoteattnum + 1)));
1155 86712 : slot->tts_isnull[i] = false;
1156 : }
1157 : else
1158 : {
1159 : /* must be LOGICALREP_COLUMN_NULL */
1160 96 : slot->tts_values[i] = (Datum) 0;
1161 96 : slot->tts_isnull[i] = true;
1162 : }
1163 :
1164 : /* Reset attnum for error callback */
1165 137668 : apply_error_callback_arg.remote_attnum = -1;
1166 : }
1167 : }
1168 :
1169 : /* And finally, declare that "slot" contains a valid virtual tuple */
1170 63848 : ExecStoreVirtualTuple(slot);
1171 63848 : }
1172 :
1173 : /*
1174 : * Handle BEGIN message.
1175 : */
1176 : static void
1177 936 : apply_handle_begin(StringInfo s)
1178 : {
1179 : LogicalRepBeginData begin_data;
1180 :
1181 : /* There must not be an active streaming transaction. */
1182 : Assert(!TransactionIdIsValid(stream_xid));
1183 :
1184 936 : logicalrep_read_begin(s, &begin_data);
1185 936 : set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
1186 :
1187 936 : remote_final_lsn = begin_data.final_lsn;
1188 :
1189 936 : maybe_start_skipping_changes(begin_data.final_lsn);
1190 :
1191 936 : in_remote_transaction = true;
1192 :
1193 936 : pgstat_report_activity(STATE_RUNNING, NULL);
1194 936 : }
1195 :
1196 : /*
1197 : * Handle COMMIT message.
1198 : *
1199 : * TODO, support tracking of multiple origins
1200 : */
1201 : static void
1202 860 : apply_handle_commit(StringInfo s)
1203 : {
1204 : LogicalRepCommitData commit_data;
1205 :
1206 860 : logicalrep_read_commit(s, &commit_data);
1207 :
1208 860 : if (commit_data.commit_lsn != remote_final_lsn)
1209 0 : ereport(ERROR,
1210 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1211 : errmsg_internal("incorrect commit LSN %X/%08X in commit message (expected %X/%08X)",
1212 : LSN_FORMAT_ARGS(commit_data.commit_lsn),
1213 : LSN_FORMAT_ARGS(remote_final_lsn))));
1214 :
1215 860 : apply_handle_commit_internal(&commit_data);
1216 :
1217 : /* Process any tables that are being synchronized in parallel. */
1218 860 : process_syncing_tables(commit_data.end_lsn);
1219 :
1220 860 : pgstat_report_activity(STATE_IDLE, NULL);
1221 860 : reset_apply_error_context_info();
1222 860 : }
1223 :
1224 : /*
1225 : * Handle BEGIN PREPARE message.
1226 : */
1227 : static void
1228 32 : apply_handle_begin_prepare(StringInfo s)
1229 : {
1230 : LogicalRepPreparedTxnData begin_data;
1231 :
1232 : /* Tablesync should never receive prepare. */
1233 32 : if (am_tablesync_worker())
1234 0 : ereport(ERROR,
1235 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1236 : errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1237 :
1238 : /* There must not be an active streaming transaction. */
1239 : Assert(!TransactionIdIsValid(stream_xid));
1240 :
1241 32 : logicalrep_read_begin_prepare(s, &begin_data);
1242 32 : set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
1243 :
1244 32 : remote_final_lsn = begin_data.prepare_lsn;
1245 :
1246 32 : maybe_start_skipping_changes(begin_data.prepare_lsn);
1247 :
1248 32 : in_remote_transaction = true;
1249 :
1250 32 : pgstat_report_activity(STATE_RUNNING, NULL);
1251 32 : }
1252 :
1253 : /*
1254 : * Common function to prepare the GID.
1255 : */
1256 : static void
1257 46 : apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
1258 : {
1259 : char gid[GIDSIZE];
1260 :
1261 : /*
1262 : * Compute unique GID for two_phase transactions. We don't use GID of
1263 : * prepared transaction sent by server as that can lead to deadlock when
1264 : * we have multiple subscriptions from same node point to publications on
1265 : * the same node. See comments atop worker.c
1266 : */
1267 46 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
1268 : gid, sizeof(gid));
1269 :
1270 : /*
1271 : * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1272 : * called within the PrepareTransactionBlock below.
1273 : */
1274 46 : if (!IsTransactionBlock())
1275 : {
1276 46 : BeginTransactionBlock();
1277 46 : CommitTransactionCommand(); /* Completes the preceding Begin command. */
1278 : }
1279 :
1280 : /*
1281 : * Update origin state so we can restart streaming from correct position
1282 : * in case of crash.
1283 : */
1284 46 : replorigin_session_origin_lsn = prepare_data->end_lsn;
1285 46 : replorigin_session_origin_timestamp = prepare_data->prepare_time;
1286 :
1287 46 : PrepareTransactionBlock(gid);
1288 46 : }
1289 :
1290 : /*
1291 : * Handle PREPARE message.
1292 : */
1293 : static void
1294 30 : apply_handle_prepare(StringInfo s)
1295 : {
1296 : LogicalRepPreparedTxnData prepare_data;
1297 :
1298 30 : logicalrep_read_prepare(s, &prepare_data);
1299 :
1300 30 : if (prepare_data.prepare_lsn != remote_final_lsn)
1301 0 : ereport(ERROR,
1302 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1303 : errmsg_internal("incorrect prepare LSN %X/%08X in prepare message (expected %X/%08X)",
1304 : LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1305 : LSN_FORMAT_ARGS(remote_final_lsn))));
1306 :
1307 : /*
1308 : * Unlike commit, here, we always prepare the transaction even though no
1309 : * change has happened in this transaction or all changes are skipped. It
1310 : * is done this way because at commit prepared time, we won't know whether
1311 : * we have skipped preparing a transaction because of those reasons.
1312 : *
1313 : * XXX, We can optimize such that at commit prepared time, we first check
1314 : * whether we have prepared the transaction or not but that doesn't seem
1315 : * worthwhile because such cases shouldn't be common.
1316 : */
1317 30 : begin_replication_step();
1318 :
1319 30 : apply_handle_prepare_internal(&prepare_data);
1320 :
1321 30 : end_replication_step();
1322 30 : CommitTransactionCommand();
1323 28 : pgstat_report_stat(false);
1324 :
1325 : /*
1326 : * It is okay not to set the local_end LSN for the prepare because we
1327 : * always flush the prepare record. So, we can send the acknowledgment of
1328 : * the remote_end LSN as soon as prepare is finished.
1329 : *
1330 : * XXX For the sake of consistency with commit, we could have set it with
1331 : * the LSN of prepare but as of now we don't track that value similar to
1332 : * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
1333 : * it.
1334 : */
1335 28 : store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
1336 :
1337 28 : in_remote_transaction = false;
1338 :
1339 : /* Process any tables that are being synchronized in parallel. */
1340 28 : process_syncing_tables(prepare_data.end_lsn);
1341 :
1342 : /*
1343 : * Since we have already prepared the transaction, in a case where the
1344 : * server crashes before clearing the subskiplsn, it will be left but the
1345 : * transaction won't be resent. But that's okay because it's a rare case
1346 : * and the subskiplsn will be cleared when finishing the next transaction.
1347 : */
1348 28 : stop_skipping_changes();
1349 28 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
1350 :
1351 28 : pgstat_report_activity(STATE_IDLE, NULL);
1352 28 : reset_apply_error_context_info();
1353 28 : }
1354 :
1355 : /*
1356 : * Handle a COMMIT PREPARED of a previously PREPARED transaction.
1357 : *
1358 : * Note that we don't need to wait here if the transaction was prepared in a
1359 : * parallel apply worker. In that case, we have already waited for the prepare
1360 : * to finish in apply_handle_stream_prepare() which will ensure all the
1361 : * operations in that transaction have happened in the subscriber, so no
1362 : * concurrent transaction can cause deadlock or transaction dependency issues.
1363 : */
1364 : static void
1365 40 : apply_handle_commit_prepared(StringInfo s)
1366 : {
1367 : LogicalRepCommitPreparedTxnData prepare_data;
1368 : char gid[GIDSIZE];
1369 :
1370 40 : logicalrep_read_commit_prepared(s, &prepare_data);
1371 40 : set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
1372 :
1373 : /* Compute GID for two_phase transactions. */
1374 40 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
1375 : gid, sizeof(gid));
1376 :
1377 : /* There is no transaction when COMMIT PREPARED is called */
1378 40 : begin_replication_step();
1379 :
1380 : /*
1381 : * Update origin state so we can restart streaming from correct position
1382 : * in case of crash.
1383 : */
1384 40 : replorigin_session_origin_lsn = prepare_data.end_lsn;
1385 40 : replorigin_session_origin_timestamp = prepare_data.commit_time;
1386 :
1387 40 : FinishPreparedTransaction(gid, true);
1388 40 : end_replication_step();
1389 40 : CommitTransactionCommand();
1390 40 : pgstat_report_stat(false);
1391 :
1392 40 : store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
1393 40 : in_remote_transaction = false;
1394 :
1395 : /* Process any tables that are being synchronized in parallel. */
1396 40 : process_syncing_tables(prepare_data.end_lsn);
1397 :
1398 40 : clear_subscription_skip_lsn(prepare_data.end_lsn);
1399 :
1400 40 : pgstat_report_activity(STATE_IDLE, NULL);
1401 40 : reset_apply_error_context_info();
1402 40 : }
1403 :
1404 : /*
1405 : * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
1406 : *
1407 : * Note that we don't need to wait here if the transaction was prepared in a
1408 : * parallel apply worker. In that case, we have already waited for the prepare
1409 : * to finish in apply_handle_stream_prepare() which will ensure all the
1410 : * operations in that transaction have happened in the subscriber, so no
1411 : * concurrent transaction can cause deadlock or transaction dependency issues.
1412 : */
1413 : static void
1414 10 : apply_handle_rollback_prepared(StringInfo s)
1415 : {
1416 : LogicalRepRollbackPreparedTxnData rollback_data;
1417 : char gid[GIDSIZE];
1418 :
1419 10 : logicalrep_read_rollback_prepared(s, &rollback_data);
1420 10 : set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1421 :
1422 : /* Compute GID for two_phase transactions. */
1423 10 : TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
1424 : gid, sizeof(gid));
1425 :
1426 : /*
1427 : * It is possible that we haven't received prepare because it occurred
1428 : * before walsender reached a consistent point or the two_phase was still
1429 : * not enabled by that time, so in such cases, we need to skip rollback
1430 : * prepared.
1431 : */
1432 10 : if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1433 : rollback_data.prepare_time))
1434 : {
1435 : /*
1436 : * Update origin state so we can restart streaming from correct
1437 : * position in case of crash.
1438 : */
1439 10 : replorigin_session_origin_lsn = rollback_data.rollback_end_lsn;
1440 10 : replorigin_session_origin_timestamp = rollback_data.rollback_time;
1441 :
1442 : /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1443 10 : begin_replication_step();
1444 10 : FinishPreparedTransaction(gid, false);
1445 10 : end_replication_step();
1446 10 : CommitTransactionCommand();
1447 :
1448 10 : clear_subscription_skip_lsn(rollback_data.rollback_end_lsn);
1449 : }
1450 :
1451 10 : pgstat_report_stat(false);
1452 :
1453 : /*
1454 : * It is okay not to set the local_end LSN for the rollback of prepared
1455 : * transaction because we always flush the WAL record for it. See
1456 : * apply_handle_prepare.
1457 : */
1458 10 : store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr);
1459 10 : in_remote_transaction = false;
1460 :
1461 : /* Process any tables that are being synchronized in parallel. */
1462 10 : process_syncing_tables(rollback_data.rollback_end_lsn);
1463 :
1464 10 : pgstat_report_activity(STATE_IDLE, NULL);
1465 10 : reset_apply_error_context_info();
1466 10 : }
1467 :
1468 : /*
1469 : * Handle STREAM PREPARE.
1470 : */
1471 : static void
1472 22 : apply_handle_stream_prepare(StringInfo s)
1473 : {
1474 : LogicalRepPreparedTxnData prepare_data;
1475 : ParallelApplyWorkerInfo *winfo;
1476 : TransApplyAction apply_action;
1477 :
1478 : /* Save the message before it is consumed. */
1479 22 : StringInfoData original_msg = *s;
1480 :
1481 22 : if (in_streamed_transaction)
1482 0 : ereport(ERROR,
1483 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1484 : errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1485 :
1486 : /* Tablesync should never receive prepare. */
1487 22 : if (am_tablesync_worker())
1488 0 : ereport(ERROR,
1489 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1490 : errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1491 :
1492 22 : logicalrep_read_stream_prepare(s, &prepare_data);
1493 22 : set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1494 :
1495 22 : apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
1496 :
1497 22 : switch (apply_action)
1498 : {
1499 10 : case TRANS_LEADER_APPLY:
1500 :
1501 : /*
1502 : * The transaction has been serialized to file, so replay all the
1503 : * spooled operations.
1504 : */
1505 10 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset,
1506 : prepare_data.xid, prepare_data.prepare_lsn);
1507 :
1508 : /* Mark the transaction as prepared. */
1509 10 : apply_handle_prepare_internal(&prepare_data);
1510 :
1511 10 : CommitTransactionCommand();
1512 :
1513 : /*
1514 : * It is okay not to set the local_end LSN for the prepare because
1515 : * we always flush the prepare record. See apply_handle_prepare.
1516 : */
1517 10 : store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
1518 :
1519 10 : in_remote_transaction = false;
1520 :
1521 : /* Unlink the files with serialized changes and subxact info. */
1522 10 : stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
1523 :
1524 10 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1525 10 : break;
1526 :
1527 4 : case TRANS_LEADER_SEND_TO_PARALLEL:
1528 : Assert(winfo);
1529 :
1530 4 : if (pa_send_data(winfo, s->len, s->data))
1531 : {
1532 : /* Finish processing the streaming transaction. */
1533 4 : pa_xact_finish(winfo, prepare_data.end_lsn);
1534 4 : break;
1535 : }
1536 :
1537 : /*
1538 : * Switch to serialize mode when we are not able to send the
1539 : * change to parallel apply worker.
1540 : */
1541 0 : pa_switch_to_partial_serialize(winfo, true);
1542 :
1543 : /* fall through */
1544 2 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1545 : Assert(winfo);
1546 :
1547 2 : stream_open_and_write_change(prepare_data.xid,
1548 : LOGICAL_REP_MSG_STREAM_PREPARE,
1549 : &original_msg);
1550 :
1551 2 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
1552 :
1553 : /* Finish processing the streaming transaction. */
1554 2 : pa_xact_finish(winfo, prepare_data.end_lsn);
1555 2 : break;
1556 :
1557 6 : case TRANS_PARALLEL_APPLY:
1558 :
1559 : /*
1560 : * If the parallel apply worker is applying spooled messages then
1561 : * close the file before preparing.
1562 : */
1563 6 : if (stream_fd)
1564 2 : stream_close_file();
1565 :
1566 6 : begin_replication_step();
1567 :
1568 : /* Mark the transaction as prepared. */
1569 6 : apply_handle_prepare_internal(&prepare_data);
1570 :
1571 6 : end_replication_step();
1572 :
1573 6 : CommitTransactionCommand();
1574 :
1575 : /*
1576 : * It is okay not to set the local_end LSN for the prepare because
1577 : * we always flush the prepare record. See apply_handle_prepare.
1578 : */
1579 6 : MyParallelShared->last_commit_end = InvalidXLogRecPtr;
1580 :
1581 6 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
1582 6 : pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1583 :
1584 6 : pa_reset_subtrans();
1585 :
1586 6 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1587 6 : break;
1588 :
1589 0 : default:
1590 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1591 : break;
1592 : }
1593 :
1594 22 : pgstat_report_stat(false);
1595 :
1596 : /* Process any tables that are being synchronized in parallel. */
1597 22 : process_syncing_tables(prepare_data.end_lsn);
1598 :
1599 : /*
1600 : * Similar to prepare case, the subskiplsn could be left in a case of
1601 : * server crash but it's okay. See the comments in apply_handle_prepare().
1602 : */
1603 22 : stop_skipping_changes();
1604 22 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
1605 :
1606 22 : pgstat_report_activity(STATE_IDLE, NULL);
1607 :
1608 22 : reset_apply_error_context_info();
1609 22 : }
1610 :
1611 : /*
1612 : * Handle ORIGIN message.
1613 : *
1614 : * TODO, support tracking of multiple origins
1615 : */
1616 : static void
1617 14 : apply_handle_origin(StringInfo s)
1618 : {
1619 : /*
1620 : * ORIGIN message can only come inside streaming transaction or inside
1621 : * remote transaction and before any actual writes.
1622 : */
1623 14 : if (!in_streamed_transaction &&
1624 20 : (!in_remote_transaction ||
1625 10 : (IsTransactionState() && !am_tablesync_worker())))
1626 0 : ereport(ERROR,
1627 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1628 : errmsg_internal("ORIGIN message sent out of order")));
1629 14 : }
1630 :
1631 : /*
1632 : * Initialize fileset (if not already done).
1633 : *
1634 : * Create a new file when first_segment is true, otherwise open the existing
1635 : * file.
1636 : */
1637 : void
1638 726 : stream_start_internal(TransactionId xid, bool first_segment)
1639 : {
1640 726 : begin_replication_step();
1641 :
1642 : /*
1643 : * Initialize the worker's stream_fileset if we haven't yet. This will be
1644 : * used for the entire duration of the worker so create it in a permanent
1645 : * context. We create this on the very first streaming message from any
1646 : * transaction and then use it for this and other streaming transactions.
1647 : * Now, we could create a fileset at the start of the worker as well but
1648 : * then we won't be sure that it will ever be used.
1649 : */
1650 726 : if (!MyLogicalRepWorker->stream_fileset)
1651 : {
1652 : MemoryContext oldctx;
1653 :
1654 28 : oldctx = MemoryContextSwitchTo(ApplyContext);
1655 :
1656 28 : MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
1657 28 : FileSetInit(MyLogicalRepWorker->stream_fileset);
1658 :
1659 28 : MemoryContextSwitchTo(oldctx);
1660 : }
1661 :
1662 : /* Open the spool file for this transaction. */
1663 726 : stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
1664 :
1665 : /* If this is not the first segment, open existing subxact file. */
1666 726 : if (!first_segment)
1667 662 : subxact_info_read(MyLogicalRepWorker->subid, xid);
1668 :
1669 726 : end_replication_step();
1670 726 : }
1671 :
1672 : /*
1673 : * Handle STREAM START message.
1674 : */
1675 : static void
1676 1710 : apply_handle_stream_start(StringInfo s)
1677 : {
1678 : bool first_segment;
1679 : ParallelApplyWorkerInfo *winfo;
1680 : TransApplyAction apply_action;
1681 :
1682 : /* Save the message before it is consumed. */
1683 1710 : StringInfoData original_msg = *s;
1684 :
1685 1710 : if (in_streamed_transaction)
1686 0 : ereport(ERROR,
1687 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1688 : errmsg_internal("duplicate STREAM START message")));
1689 :
1690 : /* There must not be an active streaming transaction. */
1691 : Assert(!TransactionIdIsValid(stream_xid));
1692 :
1693 : /* notify handle methods we're processing a remote transaction */
1694 1710 : in_streamed_transaction = true;
1695 :
1696 : /* extract XID of the top-level transaction */
1697 1710 : stream_xid = logicalrep_read_stream_start(s, &first_segment);
1698 :
1699 1710 : if (!TransactionIdIsValid(stream_xid))
1700 0 : ereport(ERROR,
1701 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1702 : errmsg_internal("invalid transaction ID in streamed replication transaction")));
1703 :
1704 1710 : set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
1705 :
1706 : /* Try to allocate a worker for the streaming transaction. */
1707 1710 : if (first_segment)
1708 164 : pa_allocate_worker(stream_xid);
1709 :
1710 1710 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1711 :
1712 1710 : switch (apply_action)
1713 : {
1714 686 : case TRANS_LEADER_SERIALIZE:
1715 :
1716 : /*
1717 : * Function stream_start_internal starts a transaction. This
1718 : * transaction will be committed on the stream stop unless it is a
1719 : * tablesync worker in which case it will be committed after
1720 : * processing all the messages. We need this transaction for
1721 : * handling the BufFile, used for serializing the streaming data
1722 : * and subxact info.
1723 : */
1724 686 : stream_start_internal(stream_xid, first_segment);
1725 686 : break;
1726 :
1727 500 : case TRANS_LEADER_SEND_TO_PARALLEL:
1728 : Assert(winfo);
1729 :
1730 : /*
1731 : * Once we start serializing the changes, the parallel apply
1732 : * worker will wait for the leader to release the stream lock
1733 : * until the end of the transaction. So, we don't need to release
1734 : * the lock or increment the stream count in that case.
1735 : */
1736 500 : if (pa_send_data(winfo, s->len, s->data))
1737 : {
1738 : /*
1739 : * Unlock the shared object lock so that the parallel apply
1740 : * worker can continue to receive changes.
1741 : */
1742 492 : if (!first_segment)
1743 446 : pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
1744 :
1745 : /*
1746 : * Increment the number of streaming blocks waiting to be
1747 : * processed by parallel apply worker.
1748 : */
1749 492 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
1750 :
1751 : /* Cache the parallel apply worker for this transaction. */
1752 492 : pa_set_stream_apply_worker(winfo);
1753 492 : break;
1754 : }
1755 :
1756 : /*
1757 : * Switch to serialize mode when we are not able to send the
1758 : * change to parallel apply worker.
1759 : */
1760 8 : pa_switch_to_partial_serialize(winfo, !first_segment);
1761 :
1762 : /* fall through */
1763 30 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1764 : Assert(winfo);
1765 :
1766 : /*
1767 : * Open the spool file unless it was already opened when switching
1768 : * to serialize mode. The transaction started in
1769 : * stream_start_internal will be committed on the stream stop.
1770 : */
1771 30 : if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
1772 22 : stream_start_internal(stream_xid, first_segment);
1773 :
1774 30 : stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg);
1775 :
1776 : /* Cache the parallel apply worker for this transaction. */
1777 30 : pa_set_stream_apply_worker(winfo);
1778 30 : break;
1779 :
1780 502 : case TRANS_PARALLEL_APPLY:
1781 502 : if (first_segment)
1782 : {
1783 : /* Hold the lock until the end of the transaction. */
1784 54 : pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1785 54 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED);
1786 :
1787 : /*
1788 : * Signal the leader apply worker, as it may be waiting for
1789 : * us.
1790 : */
1791 54 : logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
1792 : }
1793 :
1794 502 : parallel_stream_nchanges = 0;
1795 502 : break;
1796 :
1797 0 : default:
1798 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1799 : break;
1800 : }
1801 :
1802 1710 : pgstat_report_activity(STATE_RUNNING, NULL);
1803 1710 : }
1804 :
1805 : /*
1806 : * Update the information about subxacts and close the file.
1807 : *
1808 : * This function should be called when the stream_start_internal function has
1809 : * been called.
1810 : */
1811 : void
1812 726 : stream_stop_internal(TransactionId xid)
1813 : {
1814 : /*
1815 : * Serialize information about subxacts for the toplevel transaction, then
1816 : * close the stream messages spool file.
1817 : */
1818 726 : subxact_info_write(MyLogicalRepWorker->subid, xid);
1819 726 : stream_close_file();
1820 :
1821 : /* We must be in a valid transaction state */
1822 : Assert(IsTransactionState());
1823 :
1824 : /* Commit the per-stream transaction */
1825 726 : CommitTransactionCommand();
1826 :
1827 : /* Reset per-stream context */
1828 726 : MemoryContextReset(LogicalStreamingContext);
1829 726 : }
1830 :
1831 : /*
1832 : * Handle STREAM STOP message.
1833 : */
1834 : static void
1835 1708 : apply_handle_stream_stop(StringInfo s)
1836 : {
1837 : ParallelApplyWorkerInfo *winfo;
1838 : TransApplyAction apply_action;
1839 :
1840 1708 : if (!in_streamed_transaction)
1841 0 : ereport(ERROR,
1842 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1843 : errmsg_internal("STREAM STOP message without STREAM START")));
1844 :
1845 1708 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1846 :
1847 1708 : switch (apply_action)
1848 : {
1849 686 : case TRANS_LEADER_SERIALIZE:
1850 686 : stream_stop_internal(stream_xid);
1851 686 : break;
1852 :
1853 492 : case TRANS_LEADER_SEND_TO_PARALLEL:
1854 : Assert(winfo);
1855 :
1856 : /*
1857 : * Lock before sending the STREAM_STOP message so that the leader
1858 : * can hold the lock first and the parallel apply worker will wait
1859 : * for leader to release the lock. See Locking Considerations atop
1860 : * applyparallelworker.c.
1861 : */
1862 492 : pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
1863 :
1864 492 : if (pa_send_data(winfo, s->len, s->data))
1865 : {
1866 492 : pa_set_stream_apply_worker(NULL);
1867 492 : break;
1868 : }
1869 :
1870 : /*
1871 : * Switch to serialize mode when we are not able to send the
1872 : * change to parallel apply worker.
1873 : */
1874 0 : pa_switch_to_partial_serialize(winfo, true);
1875 :
1876 : /* fall through */
1877 30 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1878 30 : stream_write_change(LOGICAL_REP_MSG_STREAM_STOP, s);
1879 30 : stream_stop_internal(stream_xid);
1880 30 : pa_set_stream_apply_worker(NULL);
1881 30 : break;
1882 :
1883 500 : case TRANS_PARALLEL_APPLY:
1884 500 : elog(DEBUG1, "applied %u changes in the streaming chunk",
1885 : parallel_stream_nchanges);
1886 :
1887 : /*
1888 : * By the time parallel apply worker is processing the changes in
1889 : * the current streaming block, the leader apply worker may have
1890 : * sent multiple streaming blocks. This can lead to parallel apply
1891 : * worker start waiting even when there are more chunk of streams
1892 : * in the queue. So, try to lock only if there is no message left
1893 : * in the queue. See Locking Considerations atop
1894 : * applyparallelworker.c.
1895 : *
1896 : * Note that here we have a race condition where we can start
1897 : * waiting even when there are pending streaming chunks. This can
1898 : * happen if the leader sends another streaming block and acquires
1899 : * the stream lock again after the parallel apply worker checks
1900 : * that there is no pending streaming block and before it actually
1901 : * starts waiting on a lock. We can handle this case by not
1902 : * allowing the leader to increment the stream block count during
1903 : * the time parallel apply worker acquires the lock but it is not
1904 : * clear whether that is worth the complexity.
1905 : *
1906 : * Now, if this missed chunk contains rollback to savepoint, then
1907 : * there is a risk of deadlock which probably shouldn't happen
1908 : * after restart.
1909 : */
1910 500 : pa_decr_and_wait_stream_block();
1911 496 : break;
1912 :
1913 0 : default:
1914 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1915 : break;
1916 : }
1917 :
1918 1704 : in_streamed_transaction = false;
1919 1704 : stream_xid = InvalidTransactionId;
1920 :
1921 : /*
1922 : * The parallel apply worker could be in a transaction in which case we
1923 : * need to report the state as STATE_IDLEINTRANSACTION.
1924 : */
1925 1704 : if (IsTransactionOrTransactionBlock())
1926 496 : pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL);
1927 : else
1928 1208 : pgstat_report_activity(STATE_IDLE, NULL);
1929 :
1930 1704 : reset_apply_error_context_info();
1931 1704 : }
1932 :
1933 : /*
1934 : * Helper function to handle STREAM ABORT message when the transaction was
1935 : * serialized to file.
1936 : */
1937 : static void
1938 28 : stream_abort_internal(TransactionId xid, TransactionId subxid)
1939 : {
1940 : /*
1941 : * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1942 : * just delete the files with serialized info.
1943 : */
1944 28 : if (xid == subxid)
1945 2 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
1946 : else
1947 : {
1948 : /*
1949 : * OK, so it's a subxact. We need to read the subxact file for the
1950 : * toplevel transaction, determine the offset tracked for the subxact,
1951 : * and truncate the file with changes. We also remove the subxacts
1952 : * with higher offsets (or rather higher XIDs).
1953 : *
1954 : * We intentionally scan the array from the tail, because we're likely
1955 : * aborting a change for the most recent subtransactions.
1956 : *
1957 : * We can't use the binary search here as subxact XIDs won't
1958 : * necessarily arrive in sorted order, consider the case where we have
1959 : * released the savepoint for multiple subtransactions and then
1960 : * performed rollback to savepoint for one of the earlier
1961 : * sub-transaction.
1962 : */
1963 : int64 i;
1964 : int64 subidx;
1965 : BufFile *fd;
1966 26 : bool found = false;
1967 : char path[MAXPGPATH];
1968 :
1969 26 : subidx = -1;
1970 26 : begin_replication_step();
1971 26 : subxact_info_read(MyLogicalRepWorker->subid, xid);
1972 :
1973 30 : for (i = subxact_data.nsubxacts; i > 0; i--)
1974 : {
1975 22 : if (subxact_data.subxacts[i - 1].xid == subxid)
1976 : {
1977 18 : subidx = (i - 1);
1978 18 : found = true;
1979 18 : break;
1980 : }
1981 : }
1982 :
1983 : /*
1984 : * If it's an empty sub-transaction then we will not find the subxid
1985 : * here so just cleanup the subxact info and return.
1986 : */
1987 26 : if (!found)
1988 : {
1989 : /* Cleanup the subxact info */
1990 8 : cleanup_subxact_info();
1991 8 : end_replication_step();
1992 8 : CommitTransactionCommand();
1993 8 : return;
1994 : }
1995 :
1996 : /* open the changes file */
1997 18 : changes_filename(path, MyLogicalRepWorker->subid, xid);
1998 18 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path,
1999 : O_RDWR, false);
2000 :
2001 : /* OK, truncate the file at the right offset */
2002 18 : BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
2003 18 : subxact_data.subxacts[subidx].offset);
2004 18 : BufFileClose(fd);
2005 :
2006 : /* discard the subxacts added later */
2007 18 : subxact_data.nsubxacts = subidx;
2008 :
2009 : /* write the updated subxact list */
2010 18 : subxact_info_write(MyLogicalRepWorker->subid, xid);
2011 :
2012 18 : end_replication_step();
2013 18 : CommitTransactionCommand();
2014 : }
2015 : }
2016 :
2017 : /*
2018 : * Handle STREAM ABORT message.
2019 : */
2020 : static void
2021 76 : apply_handle_stream_abort(StringInfo s)
2022 : {
2023 : TransactionId xid;
2024 : TransactionId subxid;
2025 : LogicalRepStreamAbortData abort_data;
2026 : ParallelApplyWorkerInfo *winfo;
2027 : TransApplyAction apply_action;
2028 :
2029 : /* Save the message before it is consumed. */
2030 76 : StringInfoData original_msg = *s;
2031 : bool toplevel_xact;
2032 :
2033 76 : if (in_streamed_transaction)
2034 0 : ereport(ERROR,
2035 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2036 : errmsg_internal("STREAM ABORT message without STREAM STOP")));
2037 :
2038 : /* We receive abort information only when we can apply in parallel. */
2039 76 : logicalrep_read_stream_abort(s, &abort_data,
2040 76 : MyLogicalRepWorker->parallel_apply);
2041 :
2042 76 : xid = abort_data.xid;
2043 76 : subxid = abort_data.subxid;
2044 76 : toplevel_xact = (xid == subxid);
2045 :
2046 76 : set_apply_error_context_xact(subxid, abort_data.abort_lsn);
2047 :
2048 76 : apply_action = get_transaction_apply_action(xid, &winfo);
2049 :
2050 76 : switch (apply_action)
2051 : {
2052 28 : case TRANS_LEADER_APPLY:
2053 :
2054 : /*
2055 : * We are in the leader apply worker and the transaction has been
2056 : * serialized to file.
2057 : */
2058 28 : stream_abort_internal(xid, subxid);
2059 :
2060 28 : elog(DEBUG1, "finished processing the STREAM ABORT command");
2061 28 : break;
2062 :
2063 20 : case TRANS_LEADER_SEND_TO_PARALLEL:
2064 : Assert(winfo);
2065 :
2066 : /*
2067 : * For the case of aborting the subtransaction, we increment the
2068 : * number of streaming blocks and take the lock again before
2069 : * sending the STREAM_ABORT to ensure that the parallel apply
2070 : * worker will wait on the lock for the next set of changes after
2071 : * processing the STREAM_ABORT message if it is not already
2072 : * waiting for STREAM_STOP message.
2073 : *
2074 : * It is important to perform this locking before sending the
2075 : * STREAM_ABORT message so that the leader can hold the lock first
2076 : * and the parallel apply worker will wait for the leader to
2077 : * release the lock. This is the same as what we do in
2078 : * apply_handle_stream_stop. See Locking Considerations atop
2079 : * applyparallelworker.c.
2080 : */
2081 20 : if (!toplevel_xact)
2082 : {
2083 18 : pa_unlock_stream(xid, AccessExclusiveLock);
2084 18 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
2085 18 : pa_lock_stream(xid, AccessExclusiveLock);
2086 : }
2087 :
2088 20 : if (pa_send_data(winfo, s->len, s->data))
2089 : {
2090 : /*
2091 : * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
2092 : * wait here for the parallel apply worker to finish as that
2093 : * is not required to maintain the commit order and won't have
2094 : * the risk of failures due to transaction dependencies and
2095 : * deadlocks. However, it is possible that before the parallel
2096 : * worker finishes and we clear the worker info, the xid
2097 : * wraparound happens on the upstream and a new transaction
2098 : * with the same xid can appear and that can lead to duplicate
2099 : * entries in ParallelApplyTxnHash. Yet another problem could
2100 : * be that we may have serialized the changes in partial
2101 : * serialize mode and the file containing xact changes may
2102 : * already exist, and after xid wraparound trying to create
2103 : * the file for the same xid can lead to an error. To avoid
2104 : * these problems, we decide to wait for the aborts to finish.
2105 : *
2106 : * Note, it is okay to not update the flush location position
2107 : * for aborts as in worst case that means such a transaction
2108 : * won't be sent again after restart.
2109 : */
2110 20 : if (toplevel_xact)
2111 2 : pa_xact_finish(winfo, InvalidXLogRecPtr);
2112 :
2113 20 : break;
2114 : }
2115 :
2116 : /*
2117 : * Switch to serialize mode when we are not able to send the
2118 : * change to parallel apply worker.
2119 : */
2120 0 : pa_switch_to_partial_serialize(winfo, true);
2121 :
2122 : /* fall through */
2123 4 : case TRANS_LEADER_PARTIAL_SERIALIZE:
2124 : Assert(winfo);
2125 :
2126 : /*
2127 : * Parallel apply worker might have applied some changes, so write
2128 : * the STREAM_ABORT message so that it can rollback the
2129 : * subtransaction if needed.
2130 : */
2131 4 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_ABORT,
2132 : &original_msg);
2133 :
2134 4 : if (toplevel_xact)
2135 : {
2136 2 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
2137 2 : pa_xact_finish(winfo, InvalidXLogRecPtr);
2138 : }
2139 4 : break;
2140 :
2141 24 : case TRANS_PARALLEL_APPLY:
2142 :
2143 : /*
2144 : * If the parallel apply worker is applying spooled messages then
2145 : * close the file before aborting.
2146 : */
2147 24 : if (toplevel_xact && stream_fd)
2148 2 : stream_close_file();
2149 :
2150 24 : pa_stream_abort(&abort_data);
2151 :
2152 : /*
2153 : * We need to wait after processing rollback to savepoint for the
2154 : * next set of changes.
2155 : *
2156 : * We have a race condition here due to which we can start waiting
2157 : * here when there are more chunk of streams in the queue. See
2158 : * apply_handle_stream_stop.
2159 : */
2160 24 : if (!toplevel_xact)
2161 20 : pa_decr_and_wait_stream_block();
2162 :
2163 24 : elog(DEBUG1, "finished processing the STREAM ABORT command");
2164 24 : break;
2165 :
2166 0 : default:
2167 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2168 : break;
2169 : }
2170 :
2171 76 : reset_apply_error_context_info();
2172 76 : }
2173 :
2174 : /*
2175 : * Ensure that the passed location is fileset's end.
2176 : */
2177 : static void
2178 8 : ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
2179 : off_t offset)
2180 : {
2181 : char path[MAXPGPATH];
2182 : BufFile *fd;
2183 : int last_fileno;
2184 : off_t last_offset;
2185 :
2186 : Assert(!IsTransactionState());
2187 :
2188 8 : begin_replication_step();
2189 :
2190 8 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2191 :
2192 8 : fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2193 :
2194 8 : BufFileSeek(fd, 0, 0, SEEK_END);
2195 8 : BufFileTell(fd, &last_fileno, &last_offset);
2196 :
2197 8 : BufFileClose(fd);
2198 :
2199 8 : end_replication_step();
2200 :
2201 8 : if (last_fileno != fileno || last_offset != offset)
2202 0 : elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2203 : path);
2204 8 : }
2205 :
2206 : /*
2207 : * Common spoolfile processing.
2208 : */
2209 : void
2210 62 : apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
2211 : XLogRecPtr lsn)
2212 : {
2213 : int nchanges;
2214 : char path[MAXPGPATH];
2215 62 : char *buffer = NULL;
2216 : MemoryContext oldcxt;
2217 : ResourceOwner oldowner;
2218 : int fileno;
2219 : off_t offset;
2220 :
2221 62 : if (!am_parallel_apply_worker())
2222 54 : maybe_start_skipping_changes(lsn);
2223 :
2224 : /* Make sure we have an open transaction */
2225 62 : begin_replication_step();
2226 :
2227 : /*
2228 : * Allocate file handle and memory required to process all the messages in
2229 : * TopTransactionContext to avoid them getting reset after each message is
2230 : * processed.
2231 : */
2232 62 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
2233 :
2234 : /* Open the spool file for the committed/prepared transaction */
2235 62 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2236 62 : elog(DEBUG1, "replaying changes from file \"%s\"", path);
2237 :
2238 : /*
2239 : * Make sure the file is owned by the toplevel transaction so that the
2240 : * file will not be accidentally closed when aborting a subtransaction.
2241 : */
2242 62 : oldowner = CurrentResourceOwner;
2243 62 : CurrentResourceOwner = TopTransactionResourceOwner;
2244 :
2245 62 : stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2246 :
2247 62 : CurrentResourceOwner = oldowner;
2248 :
2249 62 : buffer = palloc(BLCKSZ);
2250 :
2251 62 : MemoryContextSwitchTo(oldcxt);
2252 :
2253 62 : remote_final_lsn = lsn;
2254 :
2255 : /*
2256 : * Make sure the handle apply_dispatch methods are aware we're in a remote
2257 : * transaction.
2258 : */
2259 62 : in_remote_transaction = true;
2260 62 : pgstat_report_activity(STATE_RUNNING, NULL);
2261 :
2262 62 : end_replication_step();
2263 :
2264 : /*
2265 : * Read the entries one by one and pass them through the same logic as in
2266 : * apply_dispatch.
2267 : */
2268 62 : nchanges = 0;
2269 : while (true)
2270 176940 : {
2271 : StringInfoData s2;
2272 : size_t nbytes;
2273 : int len;
2274 :
2275 177002 : CHECK_FOR_INTERRUPTS();
2276 :
2277 : /* read length of the on-disk record */
2278 177002 : nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2279 :
2280 : /* have we reached end of the file? */
2281 177002 : if (nbytes == 0)
2282 52 : break;
2283 :
2284 : /* do we have a correct length? */
2285 176950 : if (len <= 0)
2286 0 : elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2287 : len, path);
2288 :
2289 : /* make sure we have sufficiently large buffer */
2290 176950 : buffer = repalloc(buffer, len);
2291 :
2292 : /* and finally read the data into the buffer */
2293 176950 : BufFileReadExact(stream_fd, buffer, len);
2294 :
2295 176950 : BufFileTell(stream_fd, &fileno, &offset);
2296 :
2297 : /* init a stringinfo using the buffer and call apply_dispatch */
2298 176950 : initReadOnlyStringInfo(&s2, buffer, len);
2299 :
2300 : /* Ensure we are reading the data into our memory context. */
2301 176950 : oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
2302 :
2303 176950 : apply_dispatch(&s2);
2304 :
2305 176948 : MemoryContextReset(ApplyMessageContext);
2306 :
2307 176948 : MemoryContextSwitchTo(oldcxt);
2308 :
2309 176948 : nchanges++;
2310 :
2311 : /*
2312 : * It is possible the file has been closed because we have processed
2313 : * the transaction end message like stream_commit in which case that
2314 : * must be the last message.
2315 : */
2316 176948 : if (!stream_fd)
2317 : {
2318 8 : ensure_last_message(stream_fileset, xid, fileno, offset);
2319 8 : break;
2320 : }
2321 :
2322 176940 : if (nchanges % 1000 == 0)
2323 166 : elog(DEBUG1, "replayed %d changes from file \"%s\"",
2324 : nchanges, path);
2325 : }
2326 :
2327 60 : if (stream_fd)
2328 52 : stream_close_file();
2329 :
2330 60 : elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2331 : nchanges, path);
2332 :
2333 60 : return;
2334 : }
2335 :
2336 : /*
2337 : * Handle STREAM COMMIT message.
2338 : */
2339 : static void
2340 122 : apply_handle_stream_commit(StringInfo s)
2341 : {
2342 : TransactionId xid;
2343 : LogicalRepCommitData commit_data;
2344 : ParallelApplyWorkerInfo *winfo;
2345 : TransApplyAction apply_action;
2346 :
2347 : /* Save the message before it is consumed. */
2348 122 : StringInfoData original_msg = *s;
2349 :
2350 122 : if (in_streamed_transaction)
2351 0 : ereport(ERROR,
2352 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2353 : errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2354 :
2355 122 : xid = logicalrep_read_stream_commit(s, &commit_data);
2356 122 : set_apply_error_context_xact(xid, commit_data.commit_lsn);
2357 :
2358 122 : apply_action = get_transaction_apply_action(xid, &winfo);
2359 :
2360 122 : switch (apply_action)
2361 : {
2362 44 : case TRANS_LEADER_APPLY:
2363 :
2364 : /*
2365 : * The transaction has been serialized to file, so replay all the
2366 : * spooled operations.
2367 : */
2368 44 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid,
2369 : commit_data.commit_lsn);
2370 :
2371 42 : apply_handle_commit_internal(&commit_data);
2372 :
2373 : /* Unlink the files with serialized changes and subxact info. */
2374 42 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
2375 :
2376 42 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2377 42 : break;
2378 :
2379 36 : case TRANS_LEADER_SEND_TO_PARALLEL:
2380 : Assert(winfo);
2381 :
2382 36 : if (pa_send_data(winfo, s->len, s->data))
2383 : {
2384 : /* Finish processing the streaming transaction. */
2385 36 : pa_xact_finish(winfo, commit_data.end_lsn);
2386 34 : break;
2387 : }
2388 :
2389 : /*
2390 : * Switch to serialize mode when we are not able to send the
2391 : * change to parallel apply worker.
2392 : */
2393 0 : pa_switch_to_partial_serialize(winfo, true);
2394 :
2395 : /* fall through */
2396 4 : case TRANS_LEADER_PARTIAL_SERIALIZE:
2397 : Assert(winfo);
2398 :
2399 4 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_COMMIT,
2400 : &original_msg);
2401 :
2402 4 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
2403 :
2404 : /* Finish processing the streaming transaction. */
2405 4 : pa_xact_finish(winfo, commit_data.end_lsn);
2406 4 : break;
2407 :
2408 38 : case TRANS_PARALLEL_APPLY:
2409 :
2410 : /*
2411 : * If the parallel apply worker is applying spooled messages then
2412 : * close the file before committing.
2413 : */
2414 38 : if (stream_fd)
2415 4 : stream_close_file();
2416 :
2417 38 : apply_handle_commit_internal(&commit_data);
2418 :
2419 38 : MyParallelShared->last_commit_end = XactLastCommitEnd;
2420 :
2421 : /*
2422 : * It is important to set the transaction state as finished before
2423 : * releasing the lock. See pa_wait_for_xact_finish.
2424 : */
2425 38 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
2426 38 : pa_unlock_transaction(xid, AccessExclusiveLock);
2427 :
2428 38 : pa_reset_subtrans();
2429 :
2430 38 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2431 38 : break;
2432 :
2433 0 : default:
2434 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2435 : break;
2436 : }
2437 :
2438 : /* Process any tables that are being synchronized in parallel. */
2439 118 : process_syncing_tables(commit_data.end_lsn);
2440 :
2441 118 : pgstat_report_activity(STATE_IDLE, NULL);
2442 :
2443 118 : reset_apply_error_context_info();
2444 118 : }
2445 :
2446 : /*
2447 : * Helper function for apply_handle_commit and apply_handle_stream_commit.
2448 : */
2449 : static void
2450 940 : apply_handle_commit_internal(LogicalRepCommitData *commit_data)
2451 : {
2452 940 : if (is_skipping_changes())
2453 : {
2454 4 : stop_skipping_changes();
2455 :
2456 : /*
2457 : * Start a new transaction to clear the subskiplsn, if not started
2458 : * yet.
2459 : */
2460 4 : if (!IsTransactionState())
2461 2 : StartTransactionCommand();
2462 : }
2463 :
2464 940 : if (IsTransactionState())
2465 : {
2466 : /*
2467 : * The transaction is either non-empty or skipped, so we clear the
2468 : * subskiplsn.
2469 : */
2470 940 : clear_subscription_skip_lsn(commit_data->commit_lsn);
2471 :
2472 : /*
2473 : * Update origin state so we can restart streaming from correct
2474 : * position in case of crash.
2475 : */
2476 940 : replorigin_session_origin_lsn = commit_data->end_lsn;
2477 940 : replorigin_session_origin_timestamp = commit_data->committime;
2478 :
2479 940 : CommitTransactionCommand();
2480 :
2481 940 : if (IsTransactionBlock())
2482 : {
2483 8 : EndTransactionBlock(false);
2484 8 : CommitTransactionCommand();
2485 : }
2486 :
2487 940 : pgstat_report_stat(false);
2488 :
2489 940 : store_flush_position(commit_data->end_lsn, XactLastCommitEnd);
2490 : }
2491 : else
2492 : {
2493 : /* Process any invalidation messages that might have accumulated. */
2494 0 : AcceptInvalidationMessages();
2495 0 : maybe_reread_subscription();
2496 : }
2497 :
2498 940 : in_remote_transaction = false;
2499 940 : }
2500 :
2501 : /*
2502 : * Handle RELATION message.
2503 : *
2504 : * Note we don't do validation against local schema here. The validation
2505 : * against local schema is postponed until first change for given relation
2506 : * comes as we only care about it when applying changes for it anyway and we
2507 : * do less locking this way.
2508 : */
2509 : static void
2510 900 : apply_handle_relation(StringInfo s)
2511 : {
2512 : LogicalRepRelation *rel;
2513 :
2514 900 : if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
2515 72 : return;
2516 :
2517 828 : rel = logicalrep_read_rel(s);
2518 828 : logicalrep_relmap_update(rel);
2519 :
2520 : /* Also reset all entries in the partition map that refer to remoterel. */
2521 828 : logicalrep_partmap_reset_relmap(rel);
2522 : }
2523 :
2524 : /*
2525 : * Handle TYPE message.
2526 : *
2527 : * This implementation pays no attention to TYPE messages; we expect the user
2528 : * to have set things up so that the incoming data is acceptable to the input
2529 : * functions for the locally subscribed tables. Hence, we just read and
2530 : * discard the message.
2531 : */
2532 : static void
2533 36 : apply_handle_type(StringInfo s)
2534 : {
2535 : LogicalRepTyp typ;
2536 :
2537 36 : if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s))
2538 0 : return;
2539 :
2540 36 : logicalrep_read_typ(s, &typ);
2541 : }
2542 :
2543 : /*
2544 : * Check that we (the subscription owner) have sufficient privileges on the
2545 : * target relation to perform the given operation.
2546 : */
2547 : static void
2548 440604 : TargetPrivilegesCheck(Relation rel, AclMode mode)
2549 : {
2550 : Oid relid;
2551 : AclResult aclresult;
2552 :
2553 440604 : relid = RelationGetRelid(rel);
2554 440604 : aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
2555 440604 : if (aclresult != ACLCHECK_OK)
2556 14 : aclcheck_error(aclresult,
2557 14 : get_relkind_objtype(rel->rd_rel->relkind),
2558 14 : get_rel_name(relid));
2559 :
2560 : /*
2561 : * We lack the infrastructure to honor RLS policies. It might be possible
2562 : * to add such infrastructure here, but tablesync workers lack it, too, so
2563 : * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2564 : * but it seems dangerous to replicate a TRUNCATE and then refuse to
2565 : * replicate subsequent INSERTs, so we forbid all commands the same.
2566 : */
2567 440590 : if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2568 6 : ereport(ERROR,
2569 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2570 : errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2571 : GetUserNameFromId(GetUserId(), true),
2572 : RelationGetRelationName(rel))));
2573 440584 : }
2574 :
2575 : /*
2576 : * Handle INSERT message.
2577 : */
2578 :
2579 : static void
2580 371684 : apply_handle_insert(StringInfo s)
2581 : {
2582 : LogicalRepRelMapEntry *rel;
2583 : LogicalRepTupleData newtup;
2584 : LogicalRepRelId relid;
2585 : UserContext ucxt;
2586 : ApplyExecutionData *edata;
2587 : EState *estate;
2588 : TupleTableSlot *remoteslot;
2589 : MemoryContext oldctx;
2590 : bool run_as_owner;
2591 :
2592 : /*
2593 : * Quick return if we are skipping data modification changes or handling
2594 : * streamed transactions.
2595 : */
2596 723366 : if (is_skipping_changes() ||
2597 351682 : handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
2598 220114 : return;
2599 :
2600 151670 : begin_replication_step();
2601 :
2602 151666 : relid = logicalrep_read_insert(s, &newtup);
2603 151666 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2604 151650 : if (!should_apply_changes_for_rel(rel))
2605 : {
2606 : /*
2607 : * The relation can't become interesting in the middle of the
2608 : * transaction so it's safe to unlock it.
2609 : */
2610 100 : logicalrep_rel_close(rel, RowExclusiveLock);
2611 100 : end_replication_step();
2612 100 : return;
2613 : }
2614 :
2615 : /*
2616 : * Make sure that any user-supplied code runs as the table owner, unless
2617 : * the user has opted out of that behavior.
2618 : */
2619 151550 : run_as_owner = MySubscription->runasowner;
2620 151550 : if (!run_as_owner)
2621 151534 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2622 :
2623 : /* Set relation for error callback */
2624 151550 : apply_error_callback_arg.rel = rel;
2625 :
2626 : /* Initialize the executor state. */
2627 151550 : edata = create_edata_for_relation(rel);
2628 151550 : estate = edata->estate;
2629 151550 : remoteslot = ExecInitExtraTupleSlot(estate,
2630 151550 : RelationGetDescr(rel->localrel),
2631 : &TTSOpsVirtual);
2632 :
2633 : /* Process and store remote tuple in the slot */
2634 151550 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2635 151550 : slot_store_data(remoteslot, rel, &newtup);
2636 151550 : slot_fill_defaults(rel, estate, remoteslot);
2637 151550 : MemoryContextSwitchTo(oldctx);
2638 :
2639 : /* For a partitioned table, insert the tuple into a partition. */
2640 151550 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2641 102 : apply_handle_tuple_routing(edata,
2642 : remoteslot, NULL, CMD_INSERT);
2643 : else
2644 : {
2645 151448 : ResultRelInfo *relinfo = edata->targetRelInfo;
2646 :
2647 151448 : ExecOpenIndices(relinfo, false);
2648 151448 : apply_handle_insert_internal(edata, relinfo, remoteslot);
2649 151416 : ExecCloseIndices(relinfo);
2650 : }
2651 :
2652 151504 : finish_edata(edata);
2653 :
2654 : /* Reset relation for error callback */
2655 151504 : apply_error_callback_arg.rel = NULL;
2656 :
2657 151504 : if (!run_as_owner)
2658 151494 : RestoreUserContext(&ucxt);
2659 :
2660 151504 : logicalrep_rel_close(rel, NoLock);
2661 :
2662 151504 : end_replication_step();
2663 : }
2664 :
2665 : /*
2666 : * Workhorse for apply_handle_insert()
2667 : * relinfo is for the relation we're actually inserting into
2668 : * (could be a child partition of edata->targetRelInfo)
2669 : */
2670 : static void
2671 151552 : apply_handle_insert_internal(ApplyExecutionData *edata,
2672 : ResultRelInfo *relinfo,
2673 : TupleTableSlot *remoteslot)
2674 : {
2675 151552 : EState *estate = edata->estate;
2676 :
2677 : /* Caller should have opened indexes already. */
2678 : Assert(relinfo->ri_IndexRelationDescs != NULL ||
2679 : !relinfo->ri_RelationDesc->rd_rel->relhasindex ||
2680 : RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
2681 :
2682 : /* Caller will not have done this bit. */
2683 : Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
2684 151552 : InitConflictIndexes(relinfo);
2685 :
2686 : /* Do the insert. */
2687 151552 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
2688 151540 : ExecSimpleRelationInsert(relinfo, estate, remoteslot);
2689 151506 : }
2690 :
2691 : /*
2692 : * Check if the logical replication relation is updatable and throw
2693 : * appropriate error if it isn't.
2694 : */
2695 : static void
2696 144574 : check_relation_updatable(LogicalRepRelMapEntry *rel)
2697 : {
2698 : /*
2699 : * For partitioned tables, we only need to care if the target partition is
2700 : * updatable (aka has PK or RI defined for it).
2701 : */
2702 144574 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2703 60 : return;
2704 :
2705 : /* Updatable, no error. */
2706 144514 : if (rel->updatable)
2707 144514 : return;
2708 :
2709 : /*
2710 : * We are in error mode so it's fine this is somewhat slow. It's better to
2711 : * give user correct error.
2712 : */
2713 0 : if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
2714 : {
2715 0 : ereport(ERROR,
2716 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2717 : errmsg("publisher did not send replica identity column "
2718 : "expected by the logical replication target relation \"%s.%s\"",
2719 : rel->remoterel.nspname, rel->remoterel.relname)));
2720 : }
2721 :
2722 0 : ereport(ERROR,
2723 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2724 : errmsg("logical replication target relation \"%s.%s\" has "
2725 : "neither REPLICA IDENTITY index nor PRIMARY "
2726 : "KEY and published relation does not have "
2727 : "REPLICA IDENTITY FULL",
2728 : rel->remoterel.nspname, rel->remoterel.relname)));
2729 : }
2730 :
2731 : /*
2732 : * Handle UPDATE message.
2733 : *
2734 : * TODO: FDW support
2735 : */
2736 : static void
2737 132322 : apply_handle_update(StringInfo s)
2738 : {
2739 : LogicalRepRelMapEntry *rel;
2740 : LogicalRepRelId relid;
2741 : UserContext ucxt;
2742 : ApplyExecutionData *edata;
2743 : EState *estate;
2744 : LogicalRepTupleData oldtup;
2745 : LogicalRepTupleData newtup;
2746 : bool has_oldtup;
2747 : TupleTableSlot *remoteslot;
2748 : RTEPermissionInfo *target_perminfo;
2749 : MemoryContext oldctx;
2750 : bool run_as_owner;
2751 :
2752 : /*
2753 : * Quick return if we are skipping data modification changes or handling
2754 : * streamed transactions.
2755 : */
2756 264638 : if (is_skipping_changes() ||
2757 132316 : handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
2758 68446 : return;
2759 :
2760 63876 : begin_replication_step();
2761 :
2762 63874 : relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
2763 : &newtup);
2764 63874 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2765 63874 : if (!should_apply_changes_for_rel(rel))
2766 : {
2767 : /*
2768 : * The relation can't become interesting in the middle of the
2769 : * transaction so it's safe to unlock it.
2770 : */
2771 0 : logicalrep_rel_close(rel, RowExclusiveLock);
2772 0 : end_replication_step();
2773 0 : return;
2774 : }
2775 :
2776 : /* Set relation for error callback */
2777 63874 : apply_error_callback_arg.rel = rel;
2778 :
2779 : /* Check if we can do the update. */
2780 63874 : check_relation_updatable(rel);
2781 :
2782 : /*
2783 : * Make sure that any user-supplied code runs as the table owner, unless
2784 : * the user has opted out of that behavior.
2785 : */
2786 63874 : run_as_owner = MySubscription->runasowner;
2787 63874 : if (!run_as_owner)
2788 63868 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2789 :
2790 : /* Initialize the executor state. */
2791 63872 : edata = create_edata_for_relation(rel);
2792 63872 : estate = edata->estate;
2793 63872 : remoteslot = ExecInitExtraTupleSlot(estate,
2794 63872 : RelationGetDescr(rel->localrel),
2795 : &TTSOpsVirtual);
2796 :
2797 : /*
2798 : * Populate updatedCols so that per-column triggers can fire, and so
2799 : * executor can correctly pass down indexUnchanged hint. This could
2800 : * include more columns than were actually changed on the publisher
2801 : * because the logical replication protocol doesn't contain that
2802 : * information. But it would for example exclude columns that only exist
2803 : * on the subscriber, since we are not touching those.
2804 : */
2805 63872 : target_perminfo = list_nth(estate->es_rteperminfos, 0);
2806 318620 : for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2807 : {
2808 254748 : Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i);
2809 254748 : int remoteattnum = rel->attrmap->attnums[i];
2810 :
2811 254748 : if (!att->attisdropped && remoteattnum >= 0)
2812 : {
2813 : Assert(remoteattnum < newtup.ncols);
2814 137714 : if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
2815 137708 : target_perminfo->updatedCols =
2816 137708 : bms_add_member(target_perminfo->updatedCols,
2817 : i + 1 - FirstLowInvalidHeapAttributeNumber);
2818 : }
2819 : }
2820 :
2821 : /* Build the search tuple. */
2822 63872 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2823 63872 : slot_store_data(remoteslot, rel,
2824 63872 : has_oldtup ? &oldtup : &newtup);
2825 63872 : MemoryContextSwitchTo(oldctx);
2826 :
2827 : /* For a partitioned table, apply update to correct partition. */
2828 63872 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2829 26 : apply_handle_tuple_routing(edata,
2830 : remoteslot, &newtup, CMD_UPDATE);
2831 : else
2832 63846 : apply_handle_update_internal(edata, edata->targetRelInfo,
2833 : remoteslot, &newtup, rel->localindexoid);
2834 :
2835 63860 : finish_edata(edata);
2836 :
2837 : /* Reset relation for error callback */
2838 63860 : apply_error_callback_arg.rel = NULL;
2839 :
2840 63860 : if (!run_as_owner)
2841 63856 : RestoreUserContext(&ucxt);
2842 :
2843 63860 : logicalrep_rel_close(rel, NoLock);
2844 :
2845 63860 : end_replication_step();
2846 : }
2847 :
2848 : /*
2849 : * Workhorse for apply_handle_update()
2850 : * relinfo is for the relation we're actually updating in
2851 : * (could be a child partition of edata->targetRelInfo)
2852 : */
2853 : static void
2854 63846 : apply_handle_update_internal(ApplyExecutionData *edata,
2855 : ResultRelInfo *relinfo,
2856 : TupleTableSlot *remoteslot,
2857 : LogicalRepTupleData *newtup,
2858 : Oid localindexoid)
2859 : {
2860 63846 : EState *estate = edata->estate;
2861 63846 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2862 63846 : Relation localrel = relinfo->ri_RelationDesc;
2863 : EPQState epqstate;
2864 63846 : TupleTableSlot *localslot = NULL;
2865 63846 : ConflictTupleInfo conflicttuple = {0};
2866 : bool found;
2867 : MemoryContext oldctx;
2868 :
2869 63846 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2870 63846 : ExecOpenIndices(relinfo, false);
2871 :
2872 63846 : found = FindReplTupleInLocalRel(edata, localrel,
2873 : &relmapentry->remoterel,
2874 : localindexoid,
2875 : remoteslot, &localslot);
2876 :
2877 : /*
2878 : * Tuple found.
2879 : *
2880 : * Note this will fail if there are other conflicting unique indexes.
2881 : */
2882 63838 : if (found)
2883 : {
2884 : /*
2885 : * Report the conflict if the tuple was modified by a different
2886 : * origin.
2887 : */
2888 63826 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
2889 4 : &conflicttuple.origin, &conflicttuple.ts) &&
2890 4 : conflicttuple.origin != replorigin_session_origin)
2891 : {
2892 : TupleTableSlot *newslot;
2893 :
2894 : /* Store the new tuple for conflict reporting */
2895 4 : newslot = table_slot_create(localrel, &estate->es_tupleTable);
2896 4 : slot_store_data(newslot, relmapentry, newtup);
2897 :
2898 4 : conflicttuple.slot = localslot;
2899 :
2900 4 : ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
2901 : remoteslot, newslot,
2902 4 : list_make1(&conflicttuple));
2903 : }
2904 :
2905 : /* Process and store remote tuple in the slot */
2906 63826 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2907 63826 : slot_modify_data(remoteslot, localslot, relmapentry, newtup);
2908 63826 : MemoryContextSwitchTo(oldctx);
2909 :
2910 63826 : EvalPlanQualSetSlot(&epqstate, remoteslot);
2911 :
2912 63826 : InitConflictIndexes(relinfo);
2913 :
2914 : /* Do the actual update. */
2915 63826 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
2916 63826 : ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2917 : remoteslot);
2918 : }
2919 : else
2920 : {
2921 : ConflictType type;
2922 12 : TupleTableSlot *newslot = localslot;
2923 :
2924 : /*
2925 : * Detecting whether the tuple was recently deleted or never existed
2926 : * is crucial to avoid misleading the user during conflict handling.
2927 : */
2928 12 : if (FindDeletedTupleInLocalRel(localrel, localindexoid, remoteslot,
2929 : &conflicttuple.xmin,
2930 : &conflicttuple.origin,
2931 4 : &conflicttuple.ts) &&
2932 4 : conflicttuple.origin != replorigin_session_origin)
2933 4 : type = CT_UPDATE_DELETED;
2934 : else
2935 8 : type = CT_UPDATE_MISSING;
2936 :
2937 : /* Store the new tuple for conflict reporting */
2938 12 : slot_store_data(newslot, relmapentry, newtup);
2939 :
2940 : /*
2941 : * The tuple to be updated could not be found or was deleted. Do
2942 : * nothing except for emitting a log message.
2943 : */
2944 12 : ReportApplyConflict(estate, relinfo, LOG, type, remoteslot, newslot,
2945 12 : list_make1(&conflicttuple));
2946 : }
2947 :
2948 : /* Cleanup. */
2949 63834 : ExecCloseIndices(relinfo);
2950 63834 : EvalPlanQualEnd(&epqstate);
2951 63834 : }
2952 :
2953 : /*
2954 : * Handle DELETE message.
2955 : *
2956 : * TODO: FDW support
2957 : */
2958 : static void
2959 163870 : apply_handle_delete(StringInfo s)
2960 : {
2961 : LogicalRepRelMapEntry *rel;
2962 : LogicalRepTupleData oldtup;
2963 : LogicalRepRelId relid;
2964 : UserContext ucxt;
2965 : ApplyExecutionData *edata;
2966 : EState *estate;
2967 : TupleTableSlot *remoteslot;
2968 : MemoryContext oldctx;
2969 : bool run_as_owner;
2970 :
2971 : /*
2972 : * Quick return if we are skipping data modification changes or handling
2973 : * streamed transactions.
2974 : */
2975 327740 : if (is_skipping_changes() ||
2976 163870 : handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
2977 83230 : return;
2978 :
2979 80640 : begin_replication_step();
2980 :
2981 80640 : relid = logicalrep_read_delete(s, &oldtup);
2982 80640 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2983 80640 : if (!should_apply_changes_for_rel(rel))
2984 : {
2985 : /*
2986 : * The relation can't become interesting in the middle of the
2987 : * transaction so it's safe to unlock it.
2988 : */
2989 0 : logicalrep_rel_close(rel, RowExclusiveLock);
2990 0 : end_replication_step();
2991 0 : return;
2992 : }
2993 :
2994 : /* Set relation for error callback */
2995 80640 : apply_error_callback_arg.rel = rel;
2996 :
2997 : /* Check if we can do the delete. */
2998 80640 : check_relation_updatable(rel);
2999 :
3000 : /*
3001 : * Make sure that any user-supplied code runs as the table owner, unless
3002 : * the user has opted out of that behavior.
3003 : */
3004 80640 : run_as_owner = MySubscription->runasowner;
3005 80640 : if (!run_as_owner)
3006 80636 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
3007 :
3008 : /* Initialize the executor state. */
3009 80640 : edata = create_edata_for_relation(rel);
3010 80640 : estate = edata->estate;
3011 80640 : remoteslot = ExecInitExtraTupleSlot(estate,
3012 80640 : RelationGetDescr(rel->localrel),
3013 : &TTSOpsVirtual);
3014 :
3015 : /* Build the search tuple. */
3016 80640 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3017 80640 : slot_store_data(remoteslot, rel, &oldtup);
3018 80640 : MemoryContextSwitchTo(oldctx);
3019 :
3020 : /* For a partitioned table, apply delete to correct partition. */
3021 80640 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3022 34 : apply_handle_tuple_routing(edata,
3023 : remoteslot, NULL, CMD_DELETE);
3024 : else
3025 : {
3026 80606 : ResultRelInfo *relinfo = edata->targetRelInfo;
3027 :
3028 80606 : ExecOpenIndices(relinfo, false);
3029 80606 : apply_handle_delete_internal(edata, relinfo,
3030 : remoteslot, rel->localindexoid);
3031 80606 : ExecCloseIndices(relinfo);
3032 : }
3033 :
3034 80640 : finish_edata(edata);
3035 :
3036 : /* Reset relation for error callback */
3037 80640 : apply_error_callback_arg.rel = NULL;
3038 :
3039 80640 : if (!run_as_owner)
3040 80636 : RestoreUserContext(&ucxt);
3041 :
3042 80640 : logicalrep_rel_close(rel, NoLock);
3043 :
3044 80640 : end_replication_step();
3045 : }
3046 :
3047 : /*
3048 : * Workhorse for apply_handle_delete()
3049 : * relinfo is for the relation we're actually deleting from
3050 : * (could be a child partition of edata->targetRelInfo)
3051 : */
3052 : static void
3053 80640 : apply_handle_delete_internal(ApplyExecutionData *edata,
3054 : ResultRelInfo *relinfo,
3055 : TupleTableSlot *remoteslot,
3056 : Oid localindexoid)
3057 : {
3058 80640 : EState *estate = edata->estate;
3059 80640 : Relation localrel = relinfo->ri_RelationDesc;
3060 80640 : LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
3061 : EPQState epqstate;
3062 : TupleTableSlot *localslot;
3063 80640 : ConflictTupleInfo conflicttuple = {0};
3064 : bool found;
3065 :
3066 80640 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3067 :
3068 : /* Caller should have opened indexes already. */
3069 : Assert(relinfo->ri_IndexRelationDescs != NULL ||
3070 : !localrel->rd_rel->relhasindex ||
3071 : RelationGetIndexList(localrel) == NIL);
3072 :
3073 80640 : found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
3074 : remoteslot, &localslot);
3075 :
3076 : /* If found delete it. */
3077 80640 : if (found)
3078 : {
3079 : /*
3080 : * Report the conflict if the tuple was modified by a different
3081 : * origin.
3082 : */
3083 80622 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
3084 10 : &conflicttuple.origin, &conflicttuple.ts) &&
3085 10 : conflicttuple.origin != replorigin_session_origin)
3086 : {
3087 8 : conflicttuple.slot = localslot;
3088 8 : ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
3089 : remoteslot, NULL,
3090 8 : list_make1(&conflicttuple));
3091 : }
3092 :
3093 80622 : EvalPlanQualSetSlot(&epqstate, localslot);
3094 :
3095 : /* Do the actual delete. */
3096 80622 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_DELETE);
3097 80622 : ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
3098 : }
3099 : else
3100 : {
3101 : /*
3102 : * The tuple to be deleted could not be found. Do nothing except for
3103 : * emitting a log message.
3104 : */
3105 18 : ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
3106 18 : remoteslot, NULL, list_make1(&conflicttuple));
3107 : }
3108 :
3109 : /* Cleanup. */
3110 80640 : EvalPlanQualEnd(&epqstate);
3111 80640 : }
3112 :
3113 : /*
3114 : * Try to find a tuple received from the publication side (in 'remoteslot') in
3115 : * the corresponding local relation using either replica identity index,
3116 : * primary key, index or if needed, sequential scan.
3117 : *
3118 : * Local tuple, if found, is returned in '*localslot'.
3119 : */
3120 : static bool
3121 144512 : FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
3122 : LogicalRepRelation *remoterel,
3123 : Oid localidxoid,
3124 : TupleTableSlot *remoteslot,
3125 : TupleTableSlot **localslot)
3126 : {
3127 144512 : EState *estate = edata->estate;
3128 : bool found;
3129 :
3130 : /*
3131 : * Regardless of the top-level operation, we're performing a read here, so
3132 : * check for SELECT privileges.
3133 : */
3134 144512 : TargetPrivilegesCheck(localrel, ACL_SELECT);
3135 :
3136 144504 : *localslot = table_slot_create(localrel, &estate->es_tupleTable);
3137 :
3138 : Assert(OidIsValid(localidxoid) ||
3139 : (remoterel->replident == REPLICA_IDENTITY_FULL));
3140 :
3141 144504 : if (OidIsValid(localidxoid))
3142 : {
3143 : #ifdef USE_ASSERT_CHECKING
3144 : Relation idxrel = index_open(localidxoid, AccessShareLock);
3145 :
3146 : /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
3147 : Assert(GetRelationIdentityOrPK(localrel) == localidxoid ||
3148 : (remoterel->replident == REPLICA_IDENTITY_FULL &&
3149 : IsIndexUsableForReplicaIdentityFull(idxrel,
3150 : edata->targetRel->attrmap)));
3151 : index_close(idxrel, AccessShareLock);
3152 : #endif
3153 :
3154 144204 : found = RelationFindReplTupleByIndex(localrel, localidxoid,
3155 : LockTupleExclusive,
3156 : remoteslot, *localslot);
3157 : }
3158 : else
3159 300 : found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
3160 : remoteslot, *localslot);
3161 :
3162 144504 : return found;
3163 : }
3164 :
3165 : /*
3166 : * Determine whether the index can reliably locate the deleted tuple in the
3167 : * local relation.
3168 : *
3169 : * An index may exclude deleted tuples if it was re-indexed or re-created during
3170 : * change application. Therefore, an index is considered usable only if the
3171 : * conflict detection slot.xmin (conflict_detection_xmin) is greater than the
3172 : * index tuple's xmin. This ensures that any tuples deleted prior to the index
3173 : * creation or re-indexing are not relevant for conflict detection in the
3174 : * current apply worker.
3175 : *
3176 : * Note that indexes may also be excluded if they were modified by other DDL
3177 : * operations, such as ALTER INDEX. However, this is acceptable, as the
3178 : * likelihood of such DDL changes coinciding with the need to scan dead
3179 : * tuples for the update_deleted is low.
3180 : */
3181 : static bool
3182 2 : IsIndexUsableForFindingDeletedTuple(Oid localindexoid,
3183 : TransactionId conflict_detection_xmin)
3184 : {
3185 : HeapTuple index_tuple;
3186 : TransactionId index_xmin;
3187 :
3188 2 : index_tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(localindexoid));
3189 :
3190 2 : if (!HeapTupleIsValid(index_tuple)) /* should not happen */
3191 0 : elog(ERROR, "cache lookup failed for index %u", localindexoid);
3192 :
3193 : /*
3194 : * No need to check for a frozen transaction ID, as
3195 : * TransactionIdPrecedes() manages it internally, treating it as falling
3196 : * behind the conflict_detection_xmin.
3197 : */
3198 2 : index_xmin = HeapTupleHeaderGetXmin(index_tuple->t_data);
3199 :
3200 2 : ReleaseSysCache(index_tuple);
3201 :
3202 2 : return TransactionIdPrecedes(index_xmin, conflict_detection_xmin);
3203 : }
3204 :
3205 : /*
3206 : * Attempts to locate a deleted tuple in the local relation that matches the
3207 : * values of the tuple received from the publication side (in 'remoteslot').
3208 : * The search is performed using either the replica identity index, primary
3209 : * key, other available index, or a sequential scan if necessary.
3210 : *
3211 : * Returns true if the deleted tuple is found. If found, the transaction ID,
3212 : * origin, and commit timestamp of the deletion are stored in '*delete_xid',
3213 : * '*delete_origin', and '*delete_time' respectively.
3214 : */
3215 : static bool
3216 16 : FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid,
3217 : TupleTableSlot *remoteslot,
3218 : TransactionId *delete_xid, RepOriginId *delete_origin,
3219 : TimestampTz *delete_time)
3220 : {
3221 : TransactionId oldestxmin;
3222 : ReplicationSlot *slot;
3223 :
3224 : /*
3225 : * Return false if either dead tuples are not retained or commit timestamp
3226 : * data is not available.
3227 : */
3228 16 : if (!MySubscription->retaindeadtuples || !track_commit_timestamp)
3229 12 : return false;
3230 :
3231 : /*
3232 : * For conflict detection, we use the conflict slot's xmin value instead
3233 : * of invoking GetOldestNonRemovableTransactionId(). The slot.xmin acts as
3234 : * a threshold to identify tuples that were recently deleted. These tuples
3235 : * are not visible to concurrent transactions, but we log an
3236 : * update_deleted conflict if such a tuple matches the remote update being
3237 : * applied.
3238 : *
3239 : * Although GetOldestNonRemovableTransactionId() can return a value older
3240 : * than the slot's xmin, for our current purpose it is acceptable to treat
3241 : * tuples deleted by transactions prior to slot.xmin as update_missing
3242 : * conflicts.
3243 : *
3244 : * Ideally, we would use oldest_nonremovable_xid, which is directly
3245 : * maintained by the leader apply worker. However, this value is not
3246 : * available to table synchronization or parallel apply workers, making
3247 : * slot.xmin a practical alternative in those contexts.
3248 : */
3249 4 : slot = SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true);
3250 :
3251 : Assert(slot);
3252 :
3253 4 : SpinLockAcquire(&slot->mutex);
3254 4 : oldestxmin = slot->data.xmin;
3255 4 : SpinLockRelease(&slot->mutex);
3256 :
3257 : Assert(TransactionIdIsValid(oldestxmin));
3258 :
3259 6 : if (OidIsValid(localidxoid) &&
3260 2 : IsIndexUsableForFindingDeletedTuple(localidxoid, oldestxmin))
3261 2 : return RelationFindDeletedTupleInfoByIndex(localrel, localidxoid,
3262 : remoteslot, oldestxmin,
3263 : delete_xid, delete_origin,
3264 : delete_time);
3265 : else
3266 2 : return RelationFindDeletedTupleInfoSeq(localrel, remoteslot,
3267 : oldestxmin, delete_xid,
3268 : delete_origin, delete_time);
3269 : }
3270 :
3271 : /*
3272 : * This handles insert, update, delete on a partitioned table.
3273 : */
3274 : static void
3275 162 : apply_handle_tuple_routing(ApplyExecutionData *edata,
3276 : TupleTableSlot *remoteslot,
3277 : LogicalRepTupleData *newtup,
3278 : CmdType operation)
3279 : {
3280 162 : EState *estate = edata->estate;
3281 162 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
3282 162 : ResultRelInfo *relinfo = edata->targetRelInfo;
3283 162 : Relation parentrel = relinfo->ri_RelationDesc;
3284 : ModifyTableState *mtstate;
3285 : PartitionTupleRouting *proute;
3286 : ResultRelInfo *partrelinfo;
3287 : Relation partrel;
3288 : TupleTableSlot *remoteslot_part;
3289 : TupleConversionMap *map;
3290 : MemoryContext oldctx;
3291 162 : LogicalRepRelMapEntry *part_entry = NULL;
3292 162 : AttrMap *attrmap = NULL;
3293 :
3294 : /* ModifyTableState is needed for ExecFindPartition(). */
3295 162 : edata->mtstate = mtstate = makeNode(ModifyTableState);
3296 162 : mtstate->ps.plan = NULL;
3297 162 : mtstate->ps.state = estate;
3298 162 : mtstate->operation = operation;
3299 162 : mtstate->resultRelInfo = relinfo;
3300 :
3301 : /* ... as is PartitionTupleRouting. */
3302 162 : edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
3303 :
3304 : /*
3305 : * Find the partition to which the "search tuple" belongs.
3306 : */
3307 : Assert(remoteslot != NULL);
3308 162 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3309 162 : partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
3310 : remoteslot, estate);
3311 : Assert(partrelinfo != NULL);
3312 162 : partrel = partrelinfo->ri_RelationDesc;
3313 :
3314 : /*
3315 : * Check for supported relkind. We need this since partitions might be of
3316 : * unsupported relkinds; and the set of partitions can change, so checking
3317 : * at CREATE/ALTER SUBSCRIPTION would be insufficient.
3318 : */
3319 162 : CheckSubscriptionRelkind(partrel->rd_rel->relkind,
3320 162 : get_namespace_name(RelationGetNamespace(partrel)),
3321 162 : RelationGetRelationName(partrel));
3322 :
3323 : /*
3324 : * To perform any of the operations below, the tuple must match the
3325 : * partition's rowtype. Convert if needed or just copy, using a dedicated
3326 : * slot to store the tuple in any case.
3327 : */
3328 162 : remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
3329 162 : if (remoteslot_part == NULL)
3330 96 : remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
3331 162 : map = ExecGetRootToChildMap(partrelinfo, estate);
3332 162 : if (map != NULL)
3333 : {
3334 66 : attrmap = map->attrMap;
3335 66 : remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
3336 : remoteslot_part);
3337 : }
3338 : else
3339 : {
3340 96 : remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
3341 96 : slot_getallattrs(remoteslot_part);
3342 : }
3343 162 : MemoryContextSwitchTo(oldctx);
3344 :
3345 : /* Check if we can do the update or delete on the leaf partition. */
3346 162 : if (operation == CMD_UPDATE || operation == CMD_DELETE)
3347 : {
3348 60 : part_entry = logicalrep_partition_open(relmapentry, partrel,
3349 : attrmap);
3350 60 : check_relation_updatable(part_entry);
3351 : }
3352 :
3353 162 : switch (operation)
3354 : {
3355 102 : case CMD_INSERT:
3356 102 : apply_handle_insert_internal(edata, partrelinfo,
3357 : remoteslot_part);
3358 88 : break;
3359 :
3360 34 : case CMD_DELETE:
3361 34 : apply_handle_delete_internal(edata, partrelinfo,
3362 : remoteslot_part,
3363 : part_entry->localindexoid);
3364 34 : break;
3365 :
3366 26 : case CMD_UPDATE:
3367 :
3368 : /*
3369 : * For UPDATE, depending on whether or not the updated tuple
3370 : * satisfies the partition's constraint, perform a simple UPDATE
3371 : * of the partition or move the updated tuple into a different
3372 : * suitable partition.
3373 : */
3374 : {
3375 : TupleTableSlot *localslot;
3376 : ResultRelInfo *partrelinfo_new;
3377 : Relation partrel_new;
3378 : bool found;
3379 : EPQState epqstate;
3380 26 : ConflictTupleInfo conflicttuple = {0};
3381 :
3382 : /* Get the matching local tuple from the partition. */
3383 26 : found = FindReplTupleInLocalRel(edata, partrel,
3384 : &part_entry->remoterel,
3385 : part_entry->localindexoid,
3386 : remoteslot_part, &localslot);
3387 26 : if (!found)
3388 : {
3389 : ConflictType type;
3390 4 : TupleTableSlot *newslot = localslot;
3391 :
3392 : /*
3393 : * Detecting whether the tuple was recently deleted or
3394 : * never existed is crucial to avoid misleading the user
3395 : * during conflict handling.
3396 : */
3397 4 : if (FindDeletedTupleInLocalRel(partrel,
3398 : part_entry->localindexoid,
3399 : remoteslot_part,
3400 : &conflicttuple.xmin,
3401 : &conflicttuple.origin,
3402 0 : &conflicttuple.ts) &&
3403 0 : conflicttuple.origin != replorigin_session_origin)
3404 0 : type = CT_UPDATE_DELETED;
3405 : else
3406 4 : type = CT_UPDATE_MISSING;
3407 :
3408 : /* Store the new tuple for conflict reporting */
3409 4 : slot_store_data(newslot, part_entry, newtup);
3410 :
3411 : /*
3412 : * The tuple to be updated could not be found or was
3413 : * deleted. Do nothing except for emitting a log message.
3414 : */
3415 4 : ReportApplyConflict(estate, partrelinfo, LOG,
3416 : type, remoteslot_part, newslot,
3417 4 : list_make1(&conflicttuple));
3418 :
3419 4 : return;
3420 : }
3421 :
3422 : /*
3423 : * Report the conflict if the tuple was modified by a
3424 : * different origin.
3425 : */
3426 22 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
3427 : &conflicttuple.origin,
3428 2 : &conflicttuple.ts) &&
3429 2 : conflicttuple.origin != replorigin_session_origin)
3430 : {
3431 : TupleTableSlot *newslot;
3432 :
3433 : /* Store the new tuple for conflict reporting */
3434 2 : newslot = table_slot_create(partrel, &estate->es_tupleTable);
3435 2 : slot_store_data(newslot, part_entry, newtup);
3436 :
3437 2 : conflicttuple.slot = localslot;
3438 :
3439 2 : ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
3440 : remoteslot_part, newslot,
3441 2 : list_make1(&conflicttuple));
3442 : }
3443 :
3444 : /*
3445 : * Apply the update to the local tuple, putting the result in
3446 : * remoteslot_part.
3447 : */
3448 22 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3449 22 : slot_modify_data(remoteslot_part, localslot, part_entry,
3450 : newtup);
3451 22 : MemoryContextSwitchTo(oldctx);
3452 :
3453 22 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3454 :
3455 : /*
3456 : * Does the updated tuple still satisfy the current
3457 : * partition's constraint?
3458 : */
3459 44 : if (!partrel->rd_rel->relispartition ||
3460 22 : ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
3461 : false))
3462 : {
3463 : /*
3464 : * Yes, so simply UPDATE the partition. We don't call
3465 : * apply_handle_update_internal() here, which would
3466 : * normally do the following work, to avoid repeating some
3467 : * work already done above to find the local tuple in the
3468 : * partition.
3469 : */
3470 20 : InitConflictIndexes(partrelinfo);
3471 :
3472 20 : EvalPlanQualSetSlot(&epqstate, remoteslot_part);
3473 20 : TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
3474 : ACL_UPDATE);
3475 20 : ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3476 : localslot, remoteslot_part);
3477 : }
3478 : else
3479 : {
3480 : /* Move the tuple into the new partition. */
3481 :
3482 : /*
3483 : * New partition will be found using tuple routing, which
3484 : * can only occur via the parent table. We might need to
3485 : * convert the tuple to the parent's rowtype. Note that
3486 : * this is the tuple found in the partition, not the
3487 : * original search tuple received by this function.
3488 : */
3489 2 : if (map)
3490 : {
3491 : TupleConversionMap *PartitionToRootMap =
3492 2 : convert_tuples_by_name(RelationGetDescr(partrel),
3493 : RelationGetDescr(parentrel));
3494 :
3495 : remoteslot =
3496 2 : execute_attr_map_slot(PartitionToRootMap->attrMap,
3497 : remoteslot_part, remoteslot);
3498 : }
3499 : else
3500 : {
3501 0 : remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
3502 0 : slot_getallattrs(remoteslot);
3503 : }
3504 :
3505 : /* Find the new partition. */
3506 2 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3507 2 : partrelinfo_new = ExecFindPartition(mtstate, relinfo,
3508 : proute, remoteslot,
3509 : estate);
3510 2 : MemoryContextSwitchTo(oldctx);
3511 : Assert(partrelinfo_new != partrelinfo);
3512 2 : partrel_new = partrelinfo_new->ri_RelationDesc;
3513 :
3514 : /* Check that new partition also has supported relkind. */
3515 2 : CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3516 2 : get_namespace_name(RelationGetNamespace(partrel_new)),
3517 2 : RelationGetRelationName(partrel_new));
3518 :
3519 : /* DELETE old tuple found in the old partition. */
3520 2 : EvalPlanQualSetSlot(&epqstate, localslot);
3521 2 : TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, ACL_DELETE);
3522 2 : ExecSimpleRelationDelete(partrelinfo, estate, &epqstate, localslot);
3523 :
3524 : /* INSERT new tuple into the new partition. */
3525 :
3526 : /*
3527 : * Convert the replacement tuple to match the destination
3528 : * partition rowtype.
3529 : */
3530 2 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3531 2 : remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3532 2 : if (remoteslot_part == NULL)
3533 2 : remoteslot_part = table_slot_create(partrel_new,
3534 : &estate->es_tupleTable);
3535 2 : map = ExecGetRootToChildMap(partrelinfo_new, estate);
3536 2 : if (map != NULL)
3537 : {
3538 0 : remoteslot_part = execute_attr_map_slot(map->attrMap,
3539 : remoteslot,
3540 : remoteslot_part);
3541 : }
3542 : else
3543 : {
3544 2 : remoteslot_part = ExecCopySlot(remoteslot_part,
3545 : remoteslot);
3546 2 : slot_getallattrs(remoteslot);
3547 : }
3548 2 : MemoryContextSwitchTo(oldctx);
3549 2 : apply_handle_insert_internal(edata, partrelinfo_new,
3550 : remoteslot_part);
3551 : }
3552 :
3553 22 : EvalPlanQualEnd(&epqstate);
3554 : }
3555 22 : break;
3556 :
3557 0 : default:
3558 0 : elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3559 : break;
3560 : }
3561 : }
3562 :
3563 : /*
3564 : * Handle TRUNCATE message.
3565 : *
3566 : * TODO: FDW support
3567 : */
3568 : static void
3569 38 : apply_handle_truncate(StringInfo s)
3570 : {
3571 38 : bool cascade = false;
3572 38 : bool restart_seqs = false;
3573 38 : List *remote_relids = NIL;
3574 38 : List *remote_rels = NIL;
3575 38 : List *rels = NIL;
3576 38 : List *part_rels = NIL;
3577 38 : List *relids = NIL;
3578 38 : List *relids_logged = NIL;
3579 : ListCell *lc;
3580 38 : LOCKMODE lockmode = AccessExclusiveLock;
3581 :
3582 : /*
3583 : * Quick return if we are skipping data modification changes or handling
3584 : * streamed transactions.
3585 : */
3586 76 : if (is_skipping_changes() ||
3587 38 : handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
3588 0 : return;
3589 :
3590 38 : begin_replication_step();
3591 :
3592 38 : remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3593 :
3594 94 : foreach(lc, remote_relids)
3595 : {
3596 56 : LogicalRepRelId relid = lfirst_oid(lc);
3597 : LogicalRepRelMapEntry *rel;
3598 :
3599 56 : rel = logicalrep_rel_open(relid, lockmode);
3600 56 : if (!should_apply_changes_for_rel(rel))
3601 : {
3602 : /*
3603 : * The relation can't become interesting in the middle of the
3604 : * transaction so it's safe to unlock it.
3605 : */
3606 0 : logicalrep_rel_close(rel, lockmode);
3607 0 : continue;
3608 : }
3609 :
3610 56 : remote_rels = lappend(remote_rels, rel);
3611 56 : TargetPrivilegesCheck(rel->localrel, ACL_TRUNCATE);
3612 56 : rels = lappend(rels, rel->localrel);
3613 56 : relids = lappend_oid(relids, rel->localreloid);
3614 56 : if (RelationIsLogicallyLogged(rel->localrel))
3615 0 : relids_logged = lappend_oid(relids_logged, rel->localreloid);
3616 :
3617 : /*
3618 : * Truncate partitions if we got a message to truncate a partitioned
3619 : * table.
3620 : */
3621 56 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3622 : {
3623 : ListCell *child;
3624 8 : List *children = find_all_inheritors(rel->localreloid,
3625 : lockmode,
3626 : NULL);
3627 :
3628 30 : foreach(child, children)
3629 : {
3630 22 : Oid childrelid = lfirst_oid(child);
3631 : Relation childrel;
3632 :
3633 22 : if (list_member_oid(relids, childrelid))
3634 8 : continue;
3635 :
3636 : /* find_all_inheritors already got lock */
3637 14 : childrel = table_open(childrelid, NoLock);
3638 :
3639 : /*
3640 : * Ignore temp tables of other backends. See similar code in
3641 : * ExecuteTruncate().
3642 : */
3643 14 : if (RELATION_IS_OTHER_TEMP(childrel))
3644 : {
3645 0 : table_close(childrel, lockmode);
3646 0 : continue;
3647 : }
3648 :
3649 14 : TargetPrivilegesCheck(childrel, ACL_TRUNCATE);
3650 14 : rels = lappend(rels, childrel);
3651 14 : part_rels = lappend(part_rels, childrel);
3652 14 : relids = lappend_oid(relids, childrelid);
3653 : /* Log this relation only if needed for logical decoding */
3654 14 : if (RelationIsLogicallyLogged(childrel))
3655 0 : relids_logged = lappend_oid(relids_logged, childrelid);
3656 : }
3657 : }
3658 : }
3659 :
3660 : /*
3661 : * Even if we used CASCADE on the upstream primary we explicitly default
3662 : * to replaying changes without further cascading. This might be later
3663 : * changeable with a user specified option.
3664 : *
3665 : * MySubscription->runasowner tells us whether we want to execute
3666 : * replication actions as the subscription owner; the last argument to
3667 : * TruncateGuts tells it whether we want to switch to the table owner.
3668 : * Those are exactly opposite conditions.
3669 : */
3670 38 : ExecuteTruncateGuts(rels,
3671 : relids,
3672 : relids_logged,
3673 : DROP_RESTRICT,
3674 : restart_seqs,
3675 38 : !MySubscription->runasowner);
3676 94 : foreach(lc, remote_rels)
3677 : {
3678 56 : LogicalRepRelMapEntry *rel = lfirst(lc);
3679 :
3680 56 : logicalrep_rel_close(rel, NoLock);
3681 : }
3682 52 : foreach(lc, part_rels)
3683 : {
3684 14 : Relation rel = lfirst(lc);
3685 :
3686 14 : table_close(rel, NoLock);
3687 : }
3688 :
3689 38 : end_replication_step();
3690 : }
3691 :
3692 :
3693 : /*
3694 : * Logical replication protocol message dispatcher.
3695 : */
3696 : void
3697 674410 : apply_dispatch(StringInfo s)
3698 : {
3699 674410 : LogicalRepMsgType action = pq_getmsgbyte(s);
3700 : LogicalRepMsgType saved_command;
3701 :
3702 : /*
3703 : * Set the current command being applied. Since this function can be
3704 : * called recursively when applying spooled changes, save the current
3705 : * command.
3706 : */
3707 674410 : saved_command = apply_error_callback_arg.command;
3708 674410 : apply_error_callback_arg.command = action;
3709 :
3710 674410 : switch (action)
3711 : {
3712 936 : case LOGICAL_REP_MSG_BEGIN:
3713 936 : apply_handle_begin(s);
3714 936 : break;
3715 :
3716 860 : case LOGICAL_REP_MSG_COMMIT:
3717 860 : apply_handle_commit(s);
3718 860 : break;
3719 :
3720 371684 : case LOGICAL_REP_MSG_INSERT:
3721 371684 : apply_handle_insert(s);
3722 371618 : break;
3723 :
3724 132322 : case LOGICAL_REP_MSG_UPDATE:
3725 132322 : apply_handle_update(s);
3726 132306 : break;
3727 :
3728 163870 : case LOGICAL_REP_MSG_DELETE:
3729 163870 : apply_handle_delete(s);
3730 163870 : break;
3731 :
3732 38 : case LOGICAL_REP_MSG_TRUNCATE:
3733 38 : apply_handle_truncate(s);
3734 38 : break;
3735 :
3736 900 : case LOGICAL_REP_MSG_RELATION:
3737 900 : apply_handle_relation(s);
3738 900 : break;
3739 :
3740 36 : case LOGICAL_REP_MSG_TYPE:
3741 36 : apply_handle_type(s);
3742 36 : break;
3743 :
3744 14 : case LOGICAL_REP_MSG_ORIGIN:
3745 14 : apply_handle_origin(s);
3746 14 : break;
3747 :
3748 0 : case LOGICAL_REP_MSG_MESSAGE:
3749 :
3750 : /*
3751 : * Logical replication does not use generic logical messages yet.
3752 : * Although, it could be used by other applications that use this
3753 : * output plugin.
3754 : */
3755 0 : break;
3756 :
3757 1710 : case LOGICAL_REP_MSG_STREAM_START:
3758 1710 : apply_handle_stream_start(s);
3759 1710 : break;
3760 :
3761 1708 : case LOGICAL_REP_MSG_STREAM_STOP:
3762 1708 : apply_handle_stream_stop(s);
3763 1704 : break;
3764 :
3765 76 : case LOGICAL_REP_MSG_STREAM_ABORT:
3766 76 : apply_handle_stream_abort(s);
3767 76 : break;
3768 :
3769 122 : case LOGICAL_REP_MSG_STREAM_COMMIT:
3770 122 : apply_handle_stream_commit(s);
3771 118 : break;
3772 :
3773 32 : case LOGICAL_REP_MSG_BEGIN_PREPARE:
3774 32 : apply_handle_begin_prepare(s);
3775 32 : break;
3776 :
3777 30 : case LOGICAL_REP_MSG_PREPARE:
3778 30 : apply_handle_prepare(s);
3779 28 : break;
3780 :
3781 40 : case LOGICAL_REP_MSG_COMMIT_PREPARED:
3782 40 : apply_handle_commit_prepared(s);
3783 40 : break;
3784 :
3785 10 : case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
3786 10 : apply_handle_rollback_prepared(s);
3787 10 : break;
3788 :
3789 22 : case LOGICAL_REP_MSG_STREAM_PREPARE:
3790 22 : apply_handle_stream_prepare(s);
3791 22 : break;
3792 :
3793 0 : default:
3794 0 : ereport(ERROR,
3795 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
3796 : errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3797 : }
3798 :
3799 : /* Reset the current command */
3800 674318 : apply_error_callback_arg.command = saved_command;
3801 674318 : }
3802 :
3803 : /*
3804 : * Figure out which write/flush positions to report to the walsender process.
3805 : *
3806 : * We can't simply report back the last LSN the walsender sent us because the
3807 : * local transaction might not yet be flushed to disk locally. Instead we
3808 : * build a list that associates local with remote LSNs for every commit. When
3809 : * reporting back the flush position to the sender we iterate that list and
3810 : * check which entries on it are already locally flushed. Those we can report
3811 : * as having been flushed.
3812 : *
3813 : * The have_pending_txes is true if there are outstanding transactions that
3814 : * need to be flushed.
3815 : */
3816 : static void
3817 128454 : get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
3818 : bool *have_pending_txes)
3819 : {
3820 : dlist_mutable_iter iter;
3821 128454 : XLogRecPtr local_flush = GetFlushRecPtr(NULL);
3822 :
3823 128454 : *write = InvalidXLogRecPtr;
3824 128454 : *flush = InvalidXLogRecPtr;
3825 :
3826 129414 : dlist_foreach_modify(iter, &lsn_mapping)
3827 : {
3828 25488 : FlushPosition *pos =
3829 25488 : dlist_container(FlushPosition, node, iter.cur);
3830 :
3831 25488 : *write = pos->remote_end;
3832 :
3833 25488 : if (pos->local_end <= local_flush)
3834 : {
3835 960 : *flush = pos->remote_end;
3836 960 : dlist_delete(iter.cur);
3837 960 : pfree(pos);
3838 : }
3839 : else
3840 : {
3841 : /*
3842 : * Don't want to uselessly iterate over the rest of the list which
3843 : * could potentially be long. Instead get the last element and
3844 : * grab the write position from there.
3845 : */
3846 24528 : pos = dlist_tail_element(FlushPosition, node,
3847 : &lsn_mapping);
3848 24528 : *write = pos->remote_end;
3849 24528 : *have_pending_txes = true;
3850 24528 : return;
3851 : }
3852 : }
3853 :
3854 103926 : *have_pending_txes = !dlist_is_empty(&lsn_mapping);
3855 : }
3856 :
3857 : /*
3858 : * Store current remote/local lsn pair in the tracking list.
3859 : */
3860 : void
3861 1072 : store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
3862 : {
3863 : FlushPosition *flushpos;
3864 :
3865 : /*
3866 : * Skip for parallel apply workers, because the lsn_mapping is maintained
3867 : * by the leader apply worker.
3868 : */
3869 1072 : if (am_parallel_apply_worker())
3870 38 : return;
3871 :
3872 : /* Need to do this in permanent context */
3873 1034 : MemoryContextSwitchTo(ApplyContext);
3874 :
3875 : /* Track commit lsn */
3876 1034 : flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
3877 1034 : flushpos->local_end = local_lsn;
3878 1034 : flushpos->remote_end = remote_lsn;
3879 :
3880 1034 : dlist_push_tail(&lsn_mapping, &flushpos->node);
3881 1034 : MemoryContextSwitchTo(ApplyMessageContext);
3882 : }
3883 :
3884 :
3885 : /* Update statistics of the worker. */
3886 : static void
3887 375670 : UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
3888 : {
3889 375670 : MyLogicalRepWorker->last_lsn = last_lsn;
3890 375670 : MyLogicalRepWorker->last_send_time = send_time;
3891 375670 : MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
3892 375670 : if (reply)
3893 : {
3894 5584 : MyLogicalRepWorker->reply_lsn = last_lsn;
3895 5584 : MyLogicalRepWorker->reply_time = send_time;
3896 : }
3897 375670 : }
3898 :
3899 : /*
3900 : * Apply main loop.
3901 : */
3902 : static void
3903 778 : LogicalRepApplyLoop(XLogRecPtr last_received)
3904 : {
3905 778 : TimestampTz last_recv_timestamp = GetCurrentTimestamp();
3906 778 : bool ping_sent = false;
3907 : TimeLineID tli;
3908 : ErrorContextCallback errcallback;
3909 778 : RetainDeadTuplesData rdt_data = {0};
3910 :
3911 : /*
3912 : * Init the ApplyMessageContext which we clean up after each replication
3913 : * protocol message.
3914 : */
3915 778 : ApplyMessageContext = AllocSetContextCreate(ApplyContext,
3916 : "ApplyMessageContext",
3917 : ALLOCSET_DEFAULT_SIZES);
3918 :
3919 : /*
3920 : * This memory context is used for per-stream data when the streaming mode
3921 : * is enabled. This context is reset on each stream stop.
3922 : */
3923 778 : LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
3924 : "LogicalStreamingContext",
3925 : ALLOCSET_DEFAULT_SIZES);
3926 :
3927 : /* mark as idle, before starting to loop */
3928 778 : pgstat_report_activity(STATE_IDLE, NULL);
3929 :
3930 : /*
3931 : * Push apply error context callback. Fields will be filled while applying
3932 : * a change.
3933 : */
3934 778 : errcallback.callback = apply_error_callback;
3935 778 : errcallback.previous = error_context_stack;
3936 778 : error_context_stack = &errcallback;
3937 778 : apply_error_context_stack = error_context_stack;
3938 :
3939 : /* This outer loop iterates once per wait. */
3940 : for (;;)
3941 121844 : {
3942 122622 : pgsocket fd = PGINVALID_SOCKET;
3943 : int rc;
3944 : int len;
3945 122622 : char *buf = NULL;
3946 122622 : bool endofstream = false;
3947 : long wait_time;
3948 :
3949 122622 : CHECK_FOR_INTERRUPTS();
3950 :
3951 122622 : MemoryContextSwitchTo(ApplyMessageContext);
3952 :
3953 122622 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
3954 :
3955 122588 : if (len != 0)
3956 : {
3957 : /* Loop to process all available data (without blocking). */
3958 : for (;;)
3959 : {
3960 496092 : CHECK_FOR_INTERRUPTS();
3961 :
3962 496092 : if (len == 0)
3963 : {
3964 120404 : break;
3965 : }
3966 375688 : else if (len < 0)
3967 : {
3968 18 : ereport(LOG,
3969 : (errmsg("data stream from publisher has ended")));
3970 18 : endofstream = true;
3971 18 : break;
3972 : }
3973 : else
3974 : {
3975 : int c;
3976 : StringInfoData s;
3977 :
3978 375670 : if (ConfigReloadPending)
3979 : {
3980 0 : ConfigReloadPending = false;
3981 0 : ProcessConfigFile(PGC_SIGHUP);
3982 : }
3983 :
3984 : /* Reset timeout. */
3985 375670 : last_recv_timestamp = GetCurrentTimestamp();
3986 375670 : ping_sent = false;
3987 :
3988 375670 : rdt_data.last_recv_time = last_recv_timestamp;
3989 :
3990 : /* Ensure we are reading the data into our memory context. */
3991 375670 : MemoryContextSwitchTo(ApplyMessageContext);
3992 :
3993 375670 : initReadOnlyStringInfo(&s, buf, len);
3994 :
3995 375670 : c = pq_getmsgbyte(&s);
3996 :
3997 375670 : if (c == PqReplMsg_WALData)
3998 : {
3999 : XLogRecPtr start_lsn;
4000 : XLogRecPtr end_lsn;
4001 : TimestampTz send_time;
4002 :
4003 369686 : start_lsn = pq_getmsgint64(&s);
4004 369686 : end_lsn = pq_getmsgint64(&s);
4005 369686 : send_time = pq_getmsgint64(&s);
4006 :
4007 369686 : if (last_received < start_lsn)
4008 306524 : last_received = start_lsn;
4009 :
4010 369686 : if (last_received < end_lsn)
4011 0 : last_received = end_lsn;
4012 :
4013 369686 : UpdateWorkerStats(last_received, send_time, false);
4014 :
4015 369686 : apply_dispatch(&s);
4016 :
4017 369602 : maybe_advance_nonremovable_xid(&rdt_data, false);
4018 : }
4019 5984 : else if (c == PqReplMsg_Keepalive)
4020 : {
4021 : XLogRecPtr end_lsn;
4022 : TimestampTz timestamp;
4023 : bool reply_requested;
4024 :
4025 5584 : end_lsn = pq_getmsgint64(&s);
4026 5584 : timestamp = pq_getmsgint64(&s);
4027 5584 : reply_requested = pq_getmsgbyte(&s);
4028 :
4029 5584 : if (last_received < end_lsn)
4030 1824 : last_received = end_lsn;
4031 :
4032 5584 : send_feedback(last_received, reply_requested, false);
4033 :
4034 5584 : maybe_advance_nonremovable_xid(&rdt_data, false);
4035 :
4036 5584 : UpdateWorkerStats(last_received, timestamp, true);
4037 : }
4038 400 : else if (c == PqReplMsg_PrimaryStatusUpdate)
4039 : {
4040 400 : rdt_data.remote_lsn = pq_getmsgint64(&s);
4041 400 : rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
4042 400 : rdt_data.remote_nextxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
4043 400 : rdt_data.reply_time = pq_getmsgint64(&s);
4044 :
4045 : /*
4046 : * This should never happen, see
4047 : * ProcessStandbyPSRequestMessage. But if it happens
4048 : * due to a bug, we don't want to proceed as it can
4049 : * incorrectly advance oldest_nonremovable_xid.
4050 : */
4051 400 : if (XLogRecPtrIsInvalid(rdt_data.remote_lsn))
4052 0 : elog(ERROR, "cannot get the latest WAL position from the publisher");
4053 :
4054 400 : maybe_advance_nonremovable_xid(&rdt_data, true);
4055 :
4056 400 : UpdateWorkerStats(last_received, rdt_data.reply_time, false);
4057 : }
4058 : /* other message types are purposefully ignored */
4059 :
4060 375586 : MemoryContextReset(ApplyMessageContext);
4061 : }
4062 :
4063 375586 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
4064 : }
4065 : }
4066 :
4067 : /* confirm all writes so far */
4068 122504 : send_feedback(last_received, false, false);
4069 :
4070 : /* Reset the timestamp if no message was received */
4071 122504 : rdt_data.last_recv_time = 0;
4072 :
4073 122504 : maybe_advance_nonremovable_xid(&rdt_data, false);
4074 :
4075 122504 : if (!in_remote_transaction && !in_streamed_transaction)
4076 : {
4077 : /*
4078 : * If we didn't get any transactions for a while there might be
4079 : * unconsumed invalidation messages in the queue, consume them
4080 : * now.
4081 : */
4082 7978 : AcceptInvalidationMessages();
4083 7978 : maybe_reread_subscription();
4084 :
4085 : /* Process any table synchronization changes. */
4086 7900 : process_syncing_tables(last_received);
4087 : }
4088 :
4089 : /* Cleanup the memory. */
4090 122046 : MemoryContextReset(ApplyMessageContext);
4091 122046 : MemoryContextSwitchTo(TopMemoryContext);
4092 :
4093 : /* Check if we need to exit the streaming loop. */
4094 122046 : if (endofstream)
4095 18 : break;
4096 :
4097 : /*
4098 : * Wait for more data or latch. If we have unflushed transactions,
4099 : * wake up after WalWriterDelay to see if they've been flushed yet (in
4100 : * which case we should send a feedback message). Otherwise, there's
4101 : * no particular urgency about waking up unless we get data or a
4102 : * signal.
4103 : */
4104 122028 : if (!dlist_is_empty(&lsn_mapping))
4105 21176 : wait_time = WalWriterDelay;
4106 : else
4107 100852 : wait_time = NAPTIME_PER_CYCLE;
4108 :
4109 : /*
4110 : * Ensure to wake up when it's possible to advance the non-removable
4111 : * transaction ID.
4112 : */
4113 122028 : if (rdt_data.phase == RDT_GET_CANDIDATE_XID &&
4114 121610 : rdt_data.xid_advance_interval)
4115 104 : wait_time = Min(wait_time, rdt_data.xid_advance_interval);
4116 :
4117 122028 : rc = WaitLatchOrSocket(MyLatch,
4118 : WL_SOCKET_READABLE | WL_LATCH_SET |
4119 : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
4120 : fd, wait_time,
4121 : WAIT_EVENT_LOGICAL_APPLY_MAIN);
4122 :
4123 122028 : if (rc & WL_LATCH_SET)
4124 : {
4125 1444 : ResetLatch(MyLatch);
4126 1444 : CHECK_FOR_INTERRUPTS();
4127 : }
4128 :
4129 121844 : if (ConfigReloadPending)
4130 : {
4131 16 : ConfigReloadPending = false;
4132 16 : ProcessConfigFile(PGC_SIGHUP);
4133 : }
4134 :
4135 121844 : if (rc & WL_TIMEOUT)
4136 : {
4137 : /*
4138 : * We didn't receive anything new. If we haven't heard anything
4139 : * from the server for more than wal_receiver_timeout / 2, ping
4140 : * the server. Also, if it's been longer than
4141 : * wal_receiver_status_interval since the last update we sent,
4142 : * send a status update to the primary anyway, to report any
4143 : * progress in applying WAL.
4144 : */
4145 354 : bool requestReply = false;
4146 :
4147 : /*
4148 : * Check if time since last receive from primary has reached the
4149 : * configured limit.
4150 : */
4151 354 : if (wal_receiver_timeout > 0)
4152 : {
4153 354 : TimestampTz now = GetCurrentTimestamp();
4154 : TimestampTz timeout;
4155 :
4156 354 : timeout =
4157 354 : TimestampTzPlusMilliseconds(last_recv_timestamp,
4158 : wal_receiver_timeout);
4159 :
4160 354 : if (now >= timeout)
4161 0 : ereport(ERROR,
4162 : (errcode(ERRCODE_CONNECTION_FAILURE),
4163 : errmsg("terminating logical replication worker due to timeout")));
4164 :
4165 : /* Check to see if it's time for a ping. */
4166 354 : if (!ping_sent)
4167 : {
4168 354 : timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
4169 : (wal_receiver_timeout / 2));
4170 354 : if (now >= timeout)
4171 : {
4172 0 : requestReply = true;
4173 0 : ping_sent = true;
4174 : }
4175 : }
4176 : }
4177 :
4178 354 : send_feedback(last_received, requestReply, requestReply);
4179 :
4180 354 : maybe_advance_nonremovable_xid(&rdt_data, false);
4181 :
4182 : /*
4183 : * Force reporting to ensure long idle periods don't lead to
4184 : * arbitrarily delayed stats. Stats can only be reported outside
4185 : * of (implicit or explicit) transactions. That shouldn't lead to
4186 : * stats being delayed for long, because transactions are either
4187 : * sent as a whole on commit or streamed. Streamed transactions
4188 : * are spilled to disk and applied on commit.
4189 : */
4190 354 : if (!IsTransactionState())
4191 354 : pgstat_report_stat(true);
4192 : }
4193 : }
4194 :
4195 : /* Pop the error context stack */
4196 18 : error_context_stack = errcallback.previous;
4197 18 : apply_error_context_stack = error_context_stack;
4198 :
4199 : /* All done */
4200 18 : walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
4201 0 : }
4202 :
4203 : /*
4204 : * Send a Standby Status Update message to server.
4205 : *
4206 : * 'recvpos' is the latest LSN we've received data to, force is set if we need
4207 : * to send a response to avoid timeouts.
4208 : */
4209 : static void
4210 128442 : send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
4211 : {
4212 : static StringInfo reply_message = NULL;
4213 : static TimestampTz send_time = 0;
4214 :
4215 : static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
4216 : static XLogRecPtr last_writepos = InvalidXLogRecPtr;
4217 :
4218 : XLogRecPtr writepos;
4219 : XLogRecPtr flushpos;
4220 : TimestampTz now;
4221 : bool have_pending_txes;
4222 :
4223 : /*
4224 : * If the user doesn't want status to be reported to the publisher, be
4225 : * sure to exit before doing anything at all.
4226 : */
4227 128442 : if (!force && wal_receiver_status_interval <= 0)
4228 41392 : return;
4229 :
4230 : /* It's legal to not pass a recvpos */
4231 128442 : if (recvpos < last_recvpos)
4232 0 : recvpos = last_recvpos;
4233 :
4234 128442 : get_flush_position(&writepos, &flushpos, &have_pending_txes);
4235 :
4236 : /*
4237 : * No outstanding transactions to flush, we can report the latest received
4238 : * position. This is important for synchronous replication.
4239 : */
4240 128442 : if (!have_pending_txes)
4241 103918 : flushpos = writepos = recvpos;
4242 :
4243 128442 : if (writepos < last_writepos)
4244 0 : writepos = last_writepos;
4245 :
4246 128442 : if (flushpos < last_flushpos)
4247 24434 : flushpos = last_flushpos;
4248 :
4249 128442 : now = GetCurrentTimestamp();
4250 :
4251 : /* if we've already reported everything we're good */
4252 128442 : if (!force &&
4253 126026 : writepos == last_writepos &&
4254 41932 : flushpos == last_flushpos &&
4255 41652 : !TimestampDifferenceExceeds(send_time, now,
4256 : wal_receiver_status_interval * 1000))
4257 41392 : return;
4258 87050 : send_time = now;
4259 :
4260 87050 : if (!reply_message)
4261 : {
4262 778 : MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
4263 :
4264 778 : reply_message = makeStringInfo();
4265 778 : MemoryContextSwitchTo(oldctx);
4266 : }
4267 : else
4268 86272 : resetStringInfo(reply_message);
4269 :
4270 87050 : pq_sendbyte(reply_message, PqReplMsg_StandbyStatusUpdate);
4271 87050 : pq_sendint64(reply_message, recvpos); /* write */
4272 87050 : pq_sendint64(reply_message, flushpos); /* flush */
4273 87050 : pq_sendint64(reply_message, writepos); /* apply */
4274 87050 : pq_sendint64(reply_message, now); /* sendTime */
4275 87050 : pq_sendbyte(reply_message, requestReply); /* replyRequested */
4276 :
4277 87050 : elog(DEBUG2, "sending feedback (force %d) to recv %X/%08X, write %X/%08X, flush %X/%08X",
4278 : force,
4279 : LSN_FORMAT_ARGS(recvpos),
4280 : LSN_FORMAT_ARGS(writepos),
4281 : LSN_FORMAT_ARGS(flushpos));
4282 :
4283 87050 : walrcv_send(LogRepWorkerWalRcvConn,
4284 : reply_message->data, reply_message->len);
4285 :
4286 87050 : if (recvpos > last_recvpos)
4287 84098 : last_recvpos = recvpos;
4288 87050 : if (writepos > last_writepos)
4289 84100 : last_writepos = writepos;
4290 87050 : if (flushpos > last_flushpos)
4291 83624 : last_flushpos = flushpos;
4292 : }
4293 :
4294 : /*
4295 : * Attempt to advance the non-removable transaction ID.
4296 : *
4297 : * See comments atop worker.c for details.
4298 : */
4299 : static void
4300 498444 : maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
4301 : bool status_received)
4302 : {
4303 498444 : if (!can_advance_nonremovable_xid(rdt_data))
4304 497256 : return;
4305 :
4306 1188 : process_rdt_phase_transition(rdt_data, status_received);
4307 : }
4308 :
4309 : /*
4310 : * Preliminary check to determine if advancing the non-removable transaction ID
4311 : * is allowed.
4312 : */
4313 : static bool
4314 498444 : can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
4315 : {
4316 : /*
4317 : * It is sufficient to manage non-removable transaction ID for a
4318 : * subscription by the main apply worker to detect update_deleted reliably
4319 : * even for table sync or parallel apply workers.
4320 : */
4321 498444 : if (!am_leader_apply_worker())
4322 574 : return false;
4323 :
4324 : /* No need to advance if retaining dead tuples is not required */
4325 497870 : if (!MySubscription->retaindeadtuples)
4326 496682 : return false;
4327 :
4328 1188 : return true;
4329 : }
4330 :
4331 : /*
4332 : * Process phase transitions during the non-removable transaction ID
4333 : * advancement. See comments atop worker.c for details of the transition.
4334 : */
4335 : static void
4336 1658 : process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
4337 : bool status_received)
4338 : {
4339 1658 : switch (rdt_data->phase)
4340 : {
4341 270 : case RDT_GET_CANDIDATE_XID:
4342 270 : get_candidate_xid(rdt_data);
4343 270 : break;
4344 402 : case RDT_REQUEST_PUBLISHER_STATUS:
4345 402 : request_publisher_status(rdt_data);
4346 402 : break;
4347 894 : case RDT_WAIT_FOR_PUBLISHER_STATUS:
4348 894 : wait_for_publisher_status(rdt_data, status_received);
4349 894 : break;
4350 92 : case RDT_WAIT_FOR_LOCAL_FLUSH:
4351 92 : wait_for_local_flush(rdt_data);
4352 92 : break;
4353 : }
4354 1658 : }
4355 :
4356 : /*
4357 : * Workhorse for the RDT_GET_CANDIDATE_XID phase.
4358 : */
4359 : static void
4360 270 : get_candidate_xid(RetainDeadTuplesData *rdt_data)
4361 : {
4362 : TransactionId oldest_running_xid;
4363 : TimestampTz now;
4364 :
4365 : /*
4366 : * Use last_recv_time when applying changes in the loop to avoid
4367 : * unnecessary system time retrieval. If last_recv_time is not available,
4368 : * obtain the current timestamp.
4369 : */
4370 270 : now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4371 :
4372 : /*
4373 : * Compute the candidate_xid and request the publisher status at most once
4374 : * per xid_advance_interval. Refer to adjust_xid_advance_interval() for
4375 : * details on how this value is dynamically adjusted. This is to avoid
4376 : * using CPU and network resources without making much progress.
4377 : */
4378 270 : if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
4379 : rdt_data->xid_advance_interval))
4380 210 : return;
4381 :
4382 : /*
4383 : * Immediately update the timer, even if the function returns later
4384 : * without setting candidate_xid due to inactivity on the subscriber. This
4385 : * avoids frequent calls to GetOldestActiveTransactionId.
4386 : */
4387 60 : rdt_data->candidate_xid_time = now;
4388 :
4389 : /*
4390 : * Consider transactions in the current database, as only dead tuples from
4391 : * this database are required for conflict detection.
4392 : */
4393 60 : oldest_running_xid = GetOldestActiveTransactionId(false, false);
4394 :
4395 : /*
4396 : * Oldest active transaction ID (oldest_running_xid) can't be behind any
4397 : * of its previously computed value.
4398 : */
4399 : Assert(TransactionIdPrecedesOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
4400 : oldest_running_xid));
4401 :
4402 : /* Return if the oldest_nonremovable_xid cannot be advanced */
4403 60 : if (TransactionIdEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
4404 : oldest_running_xid))
4405 : {
4406 22 : adjust_xid_advance_interval(rdt_data, false);
4407 22 : return;
4408 : }
4409 :
4410 38 : adjust_xid_advance_interval(rdt_data, true);
4411 :
4412 38 : rdt_data->candidate_xid = oldest_running_xid;
4413 38 : rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
4414 :
4415 : /* process the next phase */
4416 38 : process_rdt_phase_transition(rdt_data, false);
4417 : }
4418 :
4419 : /*
4420 : * Workhorse for the RDT_REQUEST_PUBLISHER_STATUS phase.
4421 : */
4422 : static void
4423 402 : request_publisher_status(RetainDeadTuplesData *rdt_data)
4424 : {
4425 : static StringInfo request_message = NULL;
4426 :
4427 402 : if (!request_message)
4428 : {
4429 16 : MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
4430 :
4431 16 : request_message = makeStringInfo();
4432 16 : MemoryContextSwitchTo(oldctx);
4433 : }
4434 : else
4435 386 : resetStringInfo(request_message);
4436 :
4437 : /*
4438 : * Send the current time to update the remote walsender's latest reply
4439 : * message received time.
4440 : */
4441 402 : pq_sendbyte(request_message, PqReplMsg_PrimaryStatusRequest);
4442 402 : pq_sendint64(request_message, GetCurrentTimestamp());
4443 :
4444 402 : elog(DEBUG2, "sending publisher status request message");
4445 :
4446 : /* Send a request for the publisher status */
4447 402 : walrcv_send(LogRepWorkerWalRcvConn,
4448 : request_message->data, request_message->len);
4449 :
4450 402 : rdt_data->phase = RDT_WAIT_FOR_PUBLISHER_STATUS;
4451 :
4452 : /*
4453 : * Skip calling maybe_advance_nonremovable_xid() since further transition
4454 : * is possible only once we receive the publisher status message.
4455 : */
4456 402 : }
4457 :
4458 : /*
4459 : * Workhorse for the RDT_WAIT_FOR_PUBLISHER_STATUS phase.
4460 : */
4461 : static void
4462 894 : wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
4463 : bool status_received)
4464 : {
4465 : /*
4466 : * Return if we have requested but not yet received the publisher status.
4467 : */
4468 894 : if (!status_received)
4469 494 : return;
4470 :
4471 400 : if (!FullTransactionIdIsValid(rdt_data->remote_wait_for))
4472 36 : rdt_data->remote_wait_for = rdt_data->remote_nextxid;
4473 :
4474 : /*
4475 : * Check if all remote concurrent transactions that were active at the
4476 : * first status request have now completed. If completed, proceed to the
4477 : * next phase; otherwise, continue checking the publisher status until
4478 : * these transactions finish.
4479 : *
4480 : * It's possible that transactions in the commit phase during the last
4481 : * cycle have now finished committing, but remote_oldestxid remains older
4482 : * than remote_wait_for. This can happen if some old transaction came in
4483 : * the commit phase when we requested status in this cycle. We do not
4484 : * handle this case explicitly as it's rare and the benefit doesn't
4485 : * justify the required complexity. Tracking would require either caching
4486 : * all xids at the publisher or sending them to subscribers. The condition
4487 : * will resolve naturally once the remaining transactions are finished.
4488 : *
4489 : * Directly advancing the non-removable transaction ID is possible if
4490 : * there are no activities on the publisher since the last advancement
4491 : * cycle. However, it requires maintaining two fields, last_remote_nextxid
4492 : * and last_remote_lsn, within the structure for comparison with the
4493 : * current cycle's values. Considering the minimal cost of continuing in
4494 : * RDT_WAIT_FOR_LOCAL_FLUSH without awaiting changes, we opted not to
4495 : * advance the transaction ID here.
4496 : */
4497 400 : if (FullTransactionIdPrecedesOrEquals(rdt_data->remote_wait_for,
4498 : rdt_data->remote_oldestxid))
4499 36 : rdt_data->phase = RDT_WAIT_FOR_LOCAL_FLUSH;
4500 : else
4501 364 : rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
4502 :
4503 : /* process the next phase */
4504 400 : process_rdt_phase_transition(rdt_data, false);
4505 : }
4506 :
4507 : /*
4508 : * Workhorse for the RDT_WAIT_FOR_LOCAL_FLUSH phase.
4509 : */
4510 : static void
4511 92 : wait_for_local_flush(RetainDeadTuplesData *rdt_data)
4512 : {
4513 : Assert(!XLogRecPtrIsInvalid(rdt_data->remote_lsn) &&
4514 : TransactionIdIsValid(rdt_data->candidate_xid));
4515 :
4516 : /*
4517 : * We expect the publisher and subscriber clocks to be in sync using time
4518 : * sync service like NTP. Otherwise, we will advance this worker's
4519 : * oldest_nonremovable_xid prematurely, leading to the removal of rows
4520 : * required to detect update_deleted reliably. This check primarily
4521 : * addresses scenarios where the publisher's clock falls behind; if the
4522 : * publisher's clock is ahead, subsequent transactions will naturally bear
4523 : * later commit timestamps, conforming to the design outlined atop
4524 : * worker.c.
4525 : *
4526 : * XXX Consider waiting for the publisher's clock to catch up with the
4527 : * subscriber's before proceeding to the next phase.
4528 : */
4529 92 : if (TimestampDifferenceExceeds(rdt_data->reply_time,
4530 : rdt_data->candidate_xid_time, 0))
4531 0 : ereport(ERROR,
4532 : errmsg_internal("oldest_nonremovable_xid transaction ID could be advanced prematurely"),
4533 : errdetail_internal("The clock on the publisher is behind that of the subscriber."));
4534 :
4535 : /*
4536 : * Do not attempt to advance the non-removable transaction ID when table
4537 : * sync is in progress. During this time, changes from a single
4538 : * transaction may be applied by multiple table sync workers corresponding
4539 : * to the target tables. So, it's necessary for all table sync workers to
4540 : * apply and flush the corresponding changes before advancing the
4541 : * transaction ID, otherwise, dead tuples that are still needed for
4542 : * conflict detection in table sync workers could be removed prematurely.
4543 : * However, confirming the apply and flush progress across all table sync
4544 : * workers is complex and not worth the effort, so we simply return if not
4545 : * all tables are in the READY state.
4546 : *
4547 : * It is safe to add new tables with initial states to the subscription
4548 : * after this check because any changes applied to these tables should
4549 : * have a WAL position greater than the rdt_data->remote_lsn.
4550 : */
4551 92 : if (!AllTablesyncsReady())
4552 12 : return;
4553 :
4554 : /*
4555 : * Update and check the remote flush position if we are applying changes
4556 : * in a loop. This is done at most once per WalWriterDelay to avoid
4557 : * performing costly operations in get_flush_position() too frequently
4558 : * during change application.
4559 : */
4560 112 : if (last_flushpos < rdt_data->remote_lsn && rdt_data->last_recv_time &&
4561 32 : TimestampDifferenceExceeds(rdt_data->flushpos_update_time,
4562 : rdt_data->last_recv_time, WalWriterDelay))
4563 : {
4564 : XLogRecPtr writepos;
4565 : XLogRecPtr flushpos;
4566 : bool have_pending_txes;
4567 :
4568 : /* Fetch the latest remote flush position */
4569 12 : get_flush_position(&writepos, &flushpos, &have_pending_txes);
4570 :
4571 12 : if (flushpos > last_flushpos)
4572 0 : last_flushpos = flushpos;
4573 :
4574 12 : rdt_data->flushpos_update_time = rdt_data->last_recv_time;
4575 : }
4576 :
4577 : /* Return to wait for the changes to be applied */
4578 80 : if (last_flushpos < rdt_data->remote_lsn)
4579 48 : return;
4580 :
4581 : /*
4582 : * Reaching here means the remote WAL position has been received, and all
4583 : * transactions up to that position on the publisher have been applied and
4584 : * flushed locally. So, we can advance the non-removable transaction ID.
4585 : */
4586 32 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
4587 32 : MyLogicalRepWorker->oldest_nonremovable_xid = rdt_data->candidate_xid;
4588 32 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
4589 :
4590 32 : elog(DEBUG2, "confirmed flush up to remote lsn %X/%08X: new oldest_nonremovable_xid %u",
4591 : LSN_FORMAT_ARGS(rdt_data->remote_lsn),
4592 : rdt_data->candidate_xid);
4593 :
4594 : /* Notify launcher to update the xmin of the conflict slot */
4595 32 : ApplyLauncherWakeup();
4596 :
4597 : /*
4598 : * Reset all data fields except those used to determine the timing for the
4599 : * next round of transaction ID advancement. We can even use
4600 : * flushpos_update_time in the next round to decide whether to get the
4601 : * latest flush position.
4602 : */
4603 32 : rdt_data->phase = RDT_GET_CANDIDATE_XID;
4604 32 : rdt_data->remote_lsn = InvalidXLogRecPtr;
4605 32 : rdt_data->remote_oldestxid = InvalidFullTransactionId;
4606 32 : rdt_data->remote_nextxid = InvalidFullTransactionId;
4607 32 : rdt_data->reply_time = 0;
4608 32 : rdt_data->remote_wait_for = InvalidFullTransactionId;
4609 32 : rdt_data->candidate_xid = InvalidTransactionId;
4610 :
4611 : /* process the next phase */
4612 32 : process_rdt_phase_transition(rdt_data, false);
4613 : }
4614 :
4615 : /*
4616 : * Adjust the interval for advancing non-removable transaction IDs.
4617 : *
4618 : * We double the interval to try advancing the non-removable transaction IDs
4619 : * if there is no activity on the node. The maximum value of the interval is
4620 : * capped by wal_receiver_status_interval if it is not zero, otherwise to a
4621 : * 3 minutes which should be sufficient to avoid using CPU or network
4622 : * resources without much benefit.
4623 : *
4624 : * The interval is reset to a minimum value of 100ms once there is some
4625 : * activity on the node.
4626 : *
4627 : * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
4628 : * consider the other interval or a separate GUC if the need arises.
4629 : */
4630 : static void
4631 60 : adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
4632 : {
4633 60 : if (!new_xid_found && rdt_data->xid_advance_interval)
4634 16 : {
4635 16 : int max_interval = wal_receiver_status_interval
4636 32 : ? wal_receiver_status_interval * 1000
4637 16 : : MAX_XID_ADVANCE_INTERVAL;
4638 :
4639 : /*
4640 : * No new transaction ID has been assigned since the last check, so
4641 : * double the interval, but not beyond the maximum allowable value.
4642 : */
4643 16 : rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
4644 : max_interval);
4645 : }
4646 : else
4647 : {
4648 : /*
4649 : * A new transaction ID was found or the interval is not yet
4650 : * initialized, so set the interval to the minimum value.
4651 : */
4652 44 : rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL;
4653 : }
4654 60 : }
4655 :
4656 : /*
4657 : * Exit routine for apply workers due to subscription parameter changes.
4658 : */
4659 : static void
4660 84 : apply_worker_exit(void)
4661 : {
4662 84 : if (am_parallel_apply_worker())
4663 : {
4664 : /*
4665 : * Don't stop the parallel apply worker as the leader will detect the
4666 : * subscription parameter change and restart logical replication later
4667 : * anyway. This also prevents the leader from reporting errors when
4668 : * trying to communicate with a stopped parallel apply worker, which
4669 : * would accidentally disable subscriptions if disable_on_error was
4670 : * set.
4671 : */
4672 0 : return;
4673 : }
4674 :
4675 : /*
4676 : * Reset the last-start time for this apply worker so that the launcher
4677 : * will restart it without waiting for wal_retrieve_retry_interval if the
4678 : * subscription is still active, and so that we won't leak that hash table
4679 : * entry if it isn't.
4680 : */
4681 84 : if (am_leader_apply_worker())
4682 84 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
4683 :
4684 84 : proc_exit(0);
4685 : }
4686 :
4687 : /*
4688 : * Reread subscription info if needed.
4689 : *
4690 : * For significant changes, we react by exiting the current process; a new
4691 : * one will be launched afterwards if needed.
4692 : */
4693 : void
4694 9972 : maybe_reread_subscription(void)
4695 : {
4696 : MemoryContext oldctx;
4697 : Subscription *newsub;
4698 9972 : bool started_tx = false;
4699 :
4700 : /* When cache state is valid there is nothing to do here. */
4701 9972 : if (MySubscriptionValid)
4702 9812 : return;
4703 :
4704 : /* This function might be called inside or outside of transaction. */
4705 160 : if (!IsTransactionState())
4706 : {
4707 150 : StartTransactionCommand();
4708 150 : started_tx = true;
4709 : }
4710 :
4711 : /* Ensure allocations in permanent context. */
4712 160 : oldctx = MemoryContextSwitchTo(ApplyContext);
4713 :
4714 160 : newsub = GetSubscription(MyLogicalRepWorker->subid, true);
4715 :
4716 : /*
4717 : * Exit if the subscription was removed. This normally should not happen
4718 : * as the worker gets killed during DROP SUBSCRIPTION.
4719 : */
4720 160 : if (!newsub)
4721 : {
4722 0 : ereport(LOG,
4723 : (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
4724 : MySubscription->name)));
4725 :
4726 : /* Ensure we remove no-longer-useful entry for worker's start time */
4727 0 : if (am_leader_apply_worker())
4728 0 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
4729 :
4730 0 : proc_exit(0);
4731 : }
4732 :
4733 : /* Exit if the subscription was disabled. */
4734 160 : if (!newsub->enabled)
4735 : {
4736 24 : ereport(LOG,
4737 : (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
4738 : MySubscription->name)));
4739 :
4740 24 : apply_worker_exit();
4741 : }
4742 :
4743 : /* !slotname should never happen when enabled is true. */
4744 : Assert(newsub->slotname);
4745 :
4746 : /* two-phase cannot be altered while the worker is running */
4747 : Assert(newsub->twophasestate == MySubscription->twophasestate);
4748 :
4749 : /*
4750 : * Exit if any parameter that affects the remote connection was changed.
4751 : * The launcher will start a new worker but note that the parallel apply
4752 : * worker won't restart if the streaming option's value is changed from
4753 : * 'parallel' to any other value or the server decides not to stream the
4754 : * in-progress transaction.
4755 : */
4756 136 : if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
4757 132 : strcmp(newsub->name, MySubscription->name) != 0 ||
4758 130 : strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
4759 130 : newsub->binary != MySubscription->binary ||
4760 118 : newsub->stream != MySubscription->stream ||
4761 108 : newsub->passwordrequired != MySubscription->passwordrequired ||
4762 108 : strcmp(newsub->origin, MySubscription->origin) != 0 ||
4763 104 : newsub->owner != MySubscription->owner ||
4764 102 : !equal(newsub->publications, MySubscription->publications))
4765 : {
4766 52 : if (am_parallel_apply_worker())
4767 0 : ereport(LOG,
4768 : (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
4769 : MySubscription->name)));
4770 : else
4771 52 : ereport(LOG,
4772 : (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
4773 : MySubscription->name)));
4774 :
4775 52 : apply_worker_exit();
4776 : }
4777 :
4778 : /*
4779 : * Exit if the subscription owner's superuser privileges have been
4780 : * revoked.
4781 : */
4782 84 : if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
4783 : {
4784 8 : if (am_parallel_apply_worker())
4785 0 : ereport(LOG,
4786 : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
4787 : MySubscription->name));
4788 : else
4789 8 : ereport(LOG,
4790 : errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
4791 : MySubscription->name));
4792 :
4793 8 : apply_worker_exit();
4794 : }
4795 :
4796 : /* Check for other changes that should never happen too. */
4797 76 : if (newsub->dbid != MySubscription->dbid)
4798 : {
4799 0 : elog(ERROR, "subscription %u changed unexpectedly",
4800 : MyLogicalRepWorker->subid);
4801 : }
4802 :
4803 : /* Clean old subscription info and switch to new one. */
4804 76 : FreeSubscription(MySubscription);
4805 76 : MySubscription = newsub;
4806 :
4807 76 : MemoryContextSwitchTo(oldctx);
4808 :
4809 : /* Change synchronous commit according to the user's wishes */
4810 76 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
4811 : PGC_BACKEND, PGC_S_OVERRIDE);
4812 :
4813 76 : if (started_tx)
4814 72 : CommitTransactionCommand();
4815 :
4816 76 : MySubscriptionValid = true;
4817 : }
4818 :
4819 : /*
4820 : * Callback from subscription syscache invalidation.
4821 : */
4822 : static void
4823 168 : subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
4824 : {
4825 168 : MySubscriptionValid = false;
4826 168 : }
4827 :
4828 : /*
4829 : * subxact_info_write
4830 : * Store information about subxacts for a toplevel transaction.
4831 : *
4832 : * For each subxact we store offset of its first change in the main file.
4833 : * The file is always over-written as a whole.
4834 : *
4835 : * XXX We should only store subxacts that were not aborted yet.
4836 : */
4837 : static void
4838 744 : subxact_info_write(Oid subid, TransactionId xid)
4839 : {
4840 : char path[MAXPGPATH];
4841 : Size len;
4842 : BufFile *fd;
4843 :
4844 : Assert(TransactionIdIsValid(xid));
4845 :
4846 : /* construct the subxact filename */
4847 744 : subxact_filename(path, subid, xid);
4848 :
4849 : /* Delete the subxacts file, if exists. */
4850 744 : if (subxact_data.nsubxacts == 0)
4851 : {
4852 580 : cleanup_subxact_info();
4853 580 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
4854 :
4855 580 : return;
4856 : }
4857 :
4858 : /*
4859 : * Create the subxact file if it not already created, otherwise open the
4860 : * existing file.
4861 : */
4862 164 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR,
4863 : true);
4864 164 : if (fd == NULL)
4865 16 : fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path);
4866 :
4867 164 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
4868 :
4869 : /* Write the subxact count and subxact info */
4870 164 : BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
4871 164 : BufFileWrite(fd, subxact_data.subxacts, len);
4872 :
4873 164 : BufFileClose(fd);
4874 :
4875 : /* free the memory allocated for subxact info */
4876 164 : cleanup_subxact_info();
4877 : }
4878 :
4879 : /*
4880 : * subxact_info_read
4881 : * Restore information about subxacts of a streamed transaction.
4882 : *
4883 : * Read information about subxacts into the structure subxact_data that can be
4884 : * used later.
4885 : */
4886 : static void
4887 688 : subxact_info_read(Oid subid, TransactionId xid)
4888 : {
4889 : char path[MAXPGPATH];
4890 : Size len;
4891 : BufFile *fd;
4892 : MemoryContext oldctx;
4893 :
4894 : Assert(!subxact_data.subxacts);
4895 : Assert(subxact_data.nsubxacts == 0);
4896 : Assert(subxact_data.nsubxacts_max == 0);
4897 :
4898 : /*
4899 : * If the subxact file doesn't exist that means we don't have any subxact
4900 : * info.
4901 : */
4902 688 : subxact_filename(path, subid, xid);
4903 688 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
4904 : true);
4905 688 : if (fd == NULL)
4906 530 : return;
4907 :
4908 : /* read number of subxact items */
4909 158 : BufFileReadExact(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
4910 :
4911 158 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
4912 :
4913 : /* we keep the maximum as a power of 2 */
4914 158 : subxact_data.nsubxacts_max = 1 << my_log2(subxact_data.nsubxacts);
4915 :
4916 : /*
4917 : * Allocate subxact information in the logical streaming context. We need
4918 : * this information during the complete stream so that we can add the sub
4919 : * transaction info to this. On stream stop we will flush this information
4920 : * to the subxact file and reset the logical streaming context.
4921 : */
4922 158 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
4923 158 : subxact_data.subxacts = palloc(subxact_data.nsubxacts_max *
4924 : sizeof(SubXactInfo));
4925 158 : MemoryContextSwitchTo(oldctx);
4926 :
4927 158 : if (len > 0)
4928 158 : BufFileReadExact(fd, subxact_data.subxacts, len);
4929 :
4930 158 : BufFileClose(fd);
4931 : }
4932 :
4933 : /*
4934 : * subxact_info_add
4935 : * Add information about a subxact (offset in the main file).
4936 : */
4937 : static void
4938 205026 : subxact_info_add(TransactionId xid)
4939 : {
4940 205026 : SubXactInfo *subxacts = subxact_data.subxacts;
4941 : int64 i;
4942 :
4943 : /* We must have a valid top level stream xid and a stream fd. */
4944 : Assert(TransactionIdIsValid(stream_xid));
4945 : Assert(stream_fd != NULL);
4946 :
4947 : /*
4948 : * If the XID matches the toplevel transaction, we don't want to add it.
4949 : */
4950 205026 : if (stream_xid == xid)
4951 184778 : return;
4952 :
4953 : /*
4954 : * In most cases we're checking the same subxact as we've already seen in
4955 : * the last call, so make sure to ignore it (this change comes later).
4956 : */
4957 20248 : if (subxact_data.subxact_last == xid)
4958 20096 : return;
4959 :
4960 : /* OK, remember we're processing this XID. */
4961 152 : subxact_data.subxact_last = xid;
4962 :
4963 : /*
4964 : * Check if the transaction is already present in the array of subxact. We
4965 : * intentionally scan the array from the tail, because we're likely adding
4966 : * a change for the most recent subtransactions.
4967 : *
4968 : * XXX Can we rely on the subxact XIDs arriving in sorted order? That
4969 : * would allow us to use binary search here.
4970 : */
4971 190 : for (i = subxact_data.nsubxacts; i > 0; i--)
4972 : {
4973 : /* found, so we're done */
4974 152 : if (subxacts[i - 1].xid == xid)
4975 114 : return;
4976 : }
4977 :
4978 : /* This is a new subxact, so we need to add it to the array. */
4979 38 : if (subxact_data.nsubxacts == 0)
4980 : {
4981 : MemoryContext oldctx;
4982 :
4983 16 : subxact_data.nsubxacts_max = 128;
4984 :
4985 : /*
4986 : * Allocate this memory for subxacts in per-stream context, see
4987 : * subxact_info_read.
4988 : */
4989 16 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
4990 16 : subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
4991 16 : MemoryContextSwitchTo(oldctx);
4992 : }
4993 22 : else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
4994 : {
4995 20 : subxact_data.nsubxacts_max *= 2;
4996 20 : subxacts = repalloc(subxacts,
4997 20 : subxact_data.nsubxacts_max * sizeof(SubXactInfo));
4998 : }
4999 :
5000 38 : subxacts[subxact_data.nsubxacts].xid = xid;
5001 :
5002 : /*
5003 : * Get the current offset of the stream file and store it as offset of
5004 : * this subxact.
5005 : */
5006 38 : BufFileTell(stream_fd,
5007 38 : &subxacts[subxact_data.nsubxacts].fileno,
5008 38 : &subxacts[subxact_data.nsubxacts].offset);
5009 :
5010 38 : subxact_data.nsubxacts++;
5011 38 : subxact_data.subxacts = subxacts;
5012 : }
5013 :
5014 : /* format filename for file containing the info about subxacts */
5015 : static inline void
5016 1494 : subxact_filename(char *path, Oid subid, TransactionId xid)
5017 : {
5018 1494 : snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
5019 1494 : }
5020 :
5021 : /* format filename for file containing serialized changes */
5022 : static inline void
5023 876 : changes_filename(char *path, Oid subid, TransactionId xid)
5024 : {
5025 876 : snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
5026 876 : }
5027 :
5028 : /*
5029 : * stream_cleanup_files
5030 : * Cleanup files for a subscription / toplevel transaction.
5031 : *
5032 : * Remove files with serialized changes and subxact info for a particular
5033 : * toplevel transaction. Each subscription has a separate set of files
5034 : * for any toplevel transaction.
5035 : */
5036 : void
5037 62 : stream_cleanup_files(Oid subid, TransactionId xid)
5038 : {
5039 : char path[MAXPGPATH];
5040 :
5041 : /* Delete the changes file. */
5042 62 : changes_filename(path, subid, xid);
5043 62 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false);
5044 :
5045 : /* Delete the subxact file, if it exists. */
5046 62 : subxact_filename(path, subid, xid);
5047 62 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
5048 62 : }
5049 :
5050 : /*
5051 : * stream_open_file
5052 : * Open a file that we'll use to serialize changes for a toplevel
5053 : * transaction.
5054 : *
5055 : * Open a file for streamed changes from a toplevel transaction identified
5056 : * by stream_xid (global variable). If it's the first chunk of streamed
5057 : * changes for this transaction, create the buffile, otherwise open the
5058 : * previously created file.
5059 : */
5060 : static void
5061 726 : stream_open_file(Oid subid, TransactionId xid, bool first_segment)
5062 : {
5063 : char path[MAXPGPATH];
5064 : MemoryContext oldcxt;
5065 :
5066 : Assert(OidIsValid(subid));
5067 : Assert(TransactionIdIsValid(xid));
5068 : Assert(stream_fd == NULL);
5069 :
5070 :
5071 726 : changes_filename(path, subid, xid);
5072 726 : elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
5073 :
5074 : /*
5075 : * Create/open the buffiles under the logical streaming context so that we
5076 : * have those files until stream stop.
5077 : */
5078 726 : oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
5079 :
5080 : /*
5081 : * If this is the first streamed segment, create the changes file.
5082 : * Otherwise, just open the file for writing, in append mode.
5083 : */
5084 726 : if (first_segment)
5085 64 : stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset,
5086 : path);
5087 : else
5088 : {
5089 : /*
5090 : * Open the file and seek to the end of the file because we always
5091 : * append the changes file.
5092 : */
5093 662 : stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset,
5094 : path, O_RDWR, false);
5095 662 : BufFileSeek(stream_fd, 0, 0, SEEK_END);
5096 : }
5097 :
5098 726 : MemoryContextSwitchTo(oldcxt);
5099 726 : }
5100 :
5101 : /*
5102 : * stream_close_file
5103 : * Close the currently open file with streamed changes.
5104 : */
5105 : static void
5106 786 : stream_close_file(void)
5107 : {
5108 : Assert(stream_fd != NULL);
5109 :
5110 786 : BufFileClose(stream_fd);
5111 :
5112 786 : stream_fd = NULL;
5113 786 : }
5114 :
5115 : /*
5116 : * stream_write_change
5117 : * Serialize a change to a file for the current toplevel transaction.
5118 : *
5119 : * The change is serialized in a simple format, with length (not including
5120 : * the length), action code (identifying the message type) and message
5121 : * contents (without the subxact TransactionId value).
5122 : */
5123 : static void
5124 215108 : stream_write_change(char action, StringInfo s)
5125 : {
5126 : int len;
5127 :
5128 : Assert(stream_fd != NULL);
5129 :
5130 : /* total on-disk size, including the action type character */
5131 215108 : len = (s->len - s->cursor) + sizeof(char);
5132 :
5133 : /* first write the size */
5134 215108 : BufFileWrite(stream_fd, &len, sizeof(len));
5135 :
5136 : /* then the action */
5137 215108 : BufFileWrite(stream_fd, &action, sizeof(action));
5138 :
5139 : /* and finally the remaining part of the buffer (after the XID) */
5140 215108 : len = (s->len - s->cursor);
5141 :
5142 215108 : BufFileWrite(stream_fd, &s->data[s->cursor], len);
5143 215108 : }
5144 :
5145 : /*
5146 : * stream_open_and_write_change
5147 : * Serialize a message to a file for the given transaction.
5148 : *
5149 : * This function is similar to stream_write_change except that it will open the
5150 : * target file if not already before writing the message and close the file at
5151 : * the end.
5152 : */
5153 : static void
5154 10 : stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
5155 : {
5156 : Assert(!in_streamed_transaction);
5157 :
5158 10 : if (!stream_fd)
5159 10 : stream_start_internal(xid, false);
5160 :
5161 10 : stream_write_change(action, s);
5162 10 : stream_stop_internal(xid);
5163 10 : }
5164 :
5165 : /*
5166 : * Sets streaming options including replication slot name and origin start
5167 : * position. Workers need these options for logical replication.
5168 : */
5169 : void
5170 778 : set_stream_options(WalRcvStreamOptions *options,
5171 : char *slotname,
5172 : XLogRecPtr *origin_startpos)
5173 : {
5174 : int server_version;
5175 :
5176 778 : options->logical = true;
5177 778 : options->startpoint = *origin_startpos;
5178 778 : options->slotname = slotname;
5179 :
5180 778 : server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
5181 778 : options->proto.logical.proto_version =
5182 778 : server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
5183 : server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
5184 : server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
5185 : LOGICALREP_PROTO_VERSION_NUM;
5186 :
5187 778 : options->proto.logical.publication_names = MySubscription->publications;
5188 778 : options->proto.logical.binary = MySubscription->binary;
5189 :
5190 : /*
5191 : * Assign the appropriate option value for streaming option according to
5192 : * the 'streaming' mode and the publisher's ability to support that mode.
5193 : */
5194 778 : if (server_version >= 160000 &&
5195 778 : MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
5196 : {
5197 710 : options->proto.logical.streaming_str = "parallel";
5198 710 : MyLogicalRepWorker->parallel_apply = true;
5199 : }
5200 68 : else if (server_version >= 140000 &&
5201 68 : MySubscription->stream != LOGICALREP_STREAM_OFF)
5202 : {
5203 52 : options->proto.logical.streaming_str = "on";
5204 52 : MyLogicalRepWorker->parallel_apply = false;
5205 : }
5206 : else
5207 : {
5208 16 : options->proto.logical.streaming_str = NULL;
5209 16 : MyLogicalRepWorker->parallel_apply = false;
5210 : }
5211 :
5212 778 : options->proto.logical.twophase = false;
5213 778 : options->proto.logical.origin = pstrdup(MySubscription->origin);
5214 778 : }
5215 :
5216 : /*
5217 : * Cleanup the memory for subxacts and reset the related variables.
5218 : */
5219 : static inline void
5220 752 : cleanup_subxact_info()
5221 : {
5222 752 : if (subxact_data.subxacts)
5223 174 : pfree(subxact_data.subxacts);
5224 :
5225 752 : subxact_data.subxacts = NULL;
5226 752 : subxact_data.subxact_last = InvalidTransactionId;
5227 752 : subxact_data.nsubxacts = 0;
5228 752 : subxact_data.nsubxacts_max = 0;
5229 752 : }
5230 :
5231 : /*
5232 : * Common function to run the apply loop with error handling. Disable the
5233 : * subscription, if necessary.
5234 : *
5235 : * Note that we don't handle FATAL errors which are probably because
5236 : * of system resource error and are not repeatable.
5237 : */
5238 : void
5239 778 : start_apply(XLogRecPtr origin_startpos)
5240 : {
5241 778 : PG_TRY();
5242 : {
5243 778 : LogicalRepApplyLoop(origin_startpos);
5244 : }
5245 130 : PG_CATCH();
5246 : {
5247 : /*
5248 : * Reset the origin state to prevent the advancement of origin
5249 : * progress if we fail to apply. Otherwise, this will result in
5250 : * transaction loss as that transaction won't be sent again by the
5251 : * server.
5252 : */
5253 130 : replorigin_reset(0, (Datum) 0);
5254 :
5255 130 : if (MySubscription->disableonerr)
5256 6 : DisableSubscriptionAndExit();
5257 : else
5258 : {
5259 : /*
5260 : * Report the worker failed while applying changes. Abort the
5261 : * current transaction so that the stats message is sent in an
5262 : * idle state.
5263 : */
5264 124 : AbortOutOfAnyTransaction();
5265 124 : pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
5266 :
5267 124 : PG_RE_THROW();
5268 : }
5269 : }
5270 0 : PG_END_TRY();
5271 0 : }
5272 :
5273 : /*
5274 : * Runs the leader apply worker.
5275 : *
5276 : * It sets up replication origin, streaming options and then starts streaming.
5277 : */
5278 : static void
5279 484 : run_apply_worker()
5280 : {
5281 : char originname[NAMEDATALEN];
5282 484 : XLogRecPtr origin_startpos = InvalidXLogRecPtr;
5283 484 : char *slotname = NULL;
5284 : WalRcvStreamOptions options;
5285 : RepOriginId originid;
5286 : TimeLineID startpointTLI;
5287 : char *err;
5288 : bool must_use_password;
5289 :
5290 484 : slotname = MySubscription->slotname;
5291 :
5292 : /*
5293 : * This shouldn't happen if the subscription is enabled, but guard against
5294 : * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
5295 : * slot is NULL.)
5296 : */
5297 484 : if (!slotname)
5298 0 : ereport(ERROR,
5299 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5300 : errmsg("subscription has no replication slot set")));
5301 :
5302 : /* Setup replication origin tracking. */
5303 484 : ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
5304 : originname, sizeof(originname));
5305 484 : StartTransactionCommand();
5306 484 : originid = replorigin_by_name(originname, true);
5307 484 : if (!OidIsValid(originid))
5308 0 : originid = replorigin_create(originname);
5309 484 : replorigin_session_setup(originid, 0);
5310 484 : replorigin_session_origin = originid;
5311 484 : origin_startpos = replorigin_session_get_progress(false);
5312 484 : CommitTransactionCommand();
5313 :
5314 : /* Is the use of a password mandatory? */
5315 932 : must_use_password = MySubscription->passwordrequired &&
5316 448 : !MySubscription->ownersuperuser;
5317 :
5318 484 : LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
5319 : true, must_use_password,
5320 : MySubscription->name, &err);
5321 :
5322 462 : if (LogRepWorkerWalRcvConn == NULL)
5323 50 : ereport(ERROR,
5324 : (errcode(ERRCODE_CONNECTION_FAILURE),
5325 : errmsg("apply worker for subscription \"%s\" could not connect to the publisher: %s",
5326 : MySubscription->name, err)));
5327 :
5328 : /*
5329 : * We don't really use the output identify_system for anything but it does
5330 : * some initializations on the upstream so let's still call it.
5331 : */
5332 412 : (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
5333 :
5334 412 : set_apply_error_context_origin(originname);
5335 :
5336 412 : set_stream_options(&options, slotname, &origin_startpos);
5337 :
5338 : /*
5339 : * Even when the two_phase mode is requested by the user, it remains as
5340 : * the tri-state PENDING until all tablesyncs have reached READY state.
5341 : * Only then, can it become ENABLED.
5342 : *
5343 : * Note: If the subscription has no tables then leave the state as
5344 : * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
5345 : * work.
5346 : */
5347 444 : if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
5348 32 : AllTablesyncsReady())
5349 : {
5350 : /* Start streaming with two_phase enabled */
5351 18 : options.proto.logical.twophase = true;
5352 18 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
5353 :
5354 18 : StartTransactionCommand();
5355 :
5356 : /*
5357 : * Updating pg_subscription might involve TOAST table access, so
5358 : * ensure we have a valid snapshot.
5359 : */
5360 18 : PushActiveSnapshot(GetTransactionSnapshot());
5361 :
5362 18 : UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
5363 18 : MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
5364 18 : PopActiveSnapshot();
5365 18 : CommitTransactionCommand();
5366 : }
5367 : else
5368 : {
5369 394 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
5370 : }
5371 :
5372 412 : ereport(DEBUG1,
5373 : (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
5374 : MySubscription->name,
5375 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
5376 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
5377 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
5378 : "?")));
5379 :
5380 : /* Run the main loop. */
5381 412 : start_apply(origin_startpos);
5382 0 : }
5383 :
5384 : /*
5385 : * Common initialization for leader apply worker, parallel apply worker and
5386 : * tablesync worker.
5387 : *
5388 : * Initialize the database connection, in-memory subscription and necessary
5389 : * config options.
5390 : */
5391 : void
5392 900 : InitializeLogRepWorker(void)
5393 : {
5394 : MemoryContext oldctx;
5395 :
5396 : /* Run as replica session replication role. */
5397 900 : SetConfigOption("session_replication_role", "replica",
5398 : PGC_SUSET, PGC_S_OVERRIDE);
5399 :
5400 : /* Connect to our database. */
5401 900 : BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
5402 900 : MyLogicalRepWorker->userid,
5403 : 0);
5404 :
5405 : /*
5406 : * Set always-secure search path, so malicious users can't redirect user
5407 : * code (e.g. pg_index.indexprs).
5408 : */
5409 896 : SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
5410 :
5411 : /* Load the subscription into persistent memory context. */
5412 896 : ApplyContext = AllocSetContextCreate(TopMemoryContext,
5413 : "ApplyContext",
5414 : ALLOCSET_DEFAULT_SIZES);
5415 896 : StartTransactionCommand();
5416 896 : oldctx = MemoryContextSwitchTo(ApplyContext);
5417 :
5418 896 : MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
5419 892 : if (!MySubscription)
5420 : {
5421 0 : ereport(LOG,
5422 : (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
5423 : MyLogicalRepWorker->subid)));
5424 :
5425 : /* Ensure we remove no-longer-useful entry for worker's start time */
5426 0 : if (am_leader_apply_worker())
5427 0 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
5428 :
5429 0 : proc_exit(0);
5430 : }
5431 :
5432 892 : MySubscriptionValid = true;
5433 892 : MemoryContextSwitchTo(oldctx);
5434 :
5435 892 : if (!MySubscription->enabled)
5436 : {
5437 0 : ereport(LOG,
5438 : (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
5439 : MySubscription->name)));
5440 :
5441 0 : apply_worker_exit();
5442 : }
5443 :
5444 : /*
5445 : * Restart the worker if retain_dead_tuples was enabled during startup.
5446 : *
5447 : * At this point, the replication slot used for conflict detection might
5448 : * not exist yet, or could be dropped soon if the launcher perceives
5449 : * retain_dead_tuples as disabled. To avoid unnecessary tracking of
5450 : * oldest_nonremovable_xid when the slot is absent or at risk of being
5451 : * dropped, a restart is initiated.
5452 : *
5453 : * The oldest_nonremovable_xid should be initialized only when the
5454 : * retain_dead_tuples is enabled before launching the worker. See
5455 : * logicalrep_worker_launch.
5456 : */
5457 892 : if (am_leader_apply_worker() &&
5458 484 : MySubscription->retaindeadtuples &&
5459 18 : !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid))
5460 : {
5461 0 : ereport(LOG,
5462 : errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup",
5463 : MySubscription->name, "retain_dead_tuples"));
5464 :
5465 0 : apply_worker_exit();
5466 : }
5467 :
5468 : /* Setup synchronous commit according to the user's wishes */
5469 892 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
5470 : PGC_BACKEND, PGC_S_OVERRIDE);
5471 :
5472 : /*
5473 : * Keep us informed about subscription or role changes. Note that the
5474 : * role's superuser privilege can be revoked.
5475 : */
5476 892 : CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
5477 : subscription_change_cb,
5478 : (Datum) 0);
5479 :
5480 892 : CacheRegisterSyscacheCallback(AUTHOID,
5481 : subscription_change_cb,
5482 : (Datum) 0);
5483 :
5484 892 : if (am_tablesync_worker())
5485 388 : ereport(LOG,
5486 : (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
5487 : MySubscription->name,
5488 : get_rel_name(MyLogicalRepWorker->relid))));
5489 : else
5490 504 : ereport(LOG,
5491 : (errmsg("logical replication apply worker for subscription \"%s\" has started",
5492 : MySubscription->name)));
5493 :
5494 892 : CommitTransactionCommand();
5495 892 : }
5496 :
5497 : /*
5498 : * Reset the origin state.
5499 : */
5500 : static void
5501 1002 : replorigin_reset(int code, Datum arg)
5502 : {
5503 1002 : replorigin_session_origin = InvalidRepOriginId;
5504 1002 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
5505 1002 : replorigin_session_origin_timestamp = 0;
5506 1002 : }
5507 :
5508 : /* Common function to setup the leader apply or tablesync worker. */
5509 : void
5510 880 : SetupApplyOrSyncWorker(int worker_slot)
5511 : {
5512 : /* Attach to slot */
5513 880 : logicalrep_worker_attach(worker_slot);
5514 :
5515 : Assert(am_tablesync_worker() || am_leader_apply_worker());
5516 :
5517 : /* Setup signal handling */
5518 880 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
5519 880 : pqsignal(SIGTERM, die);
5520 880 : BackgroundWorkerUnblockSignals();
5521 :
5522 : /*
5523 : * We don't currently need any ResourceOwner in a walreceiver process, but
5524 : * if we did, we could call CreateAuxProcessResourceOwner here.
5525 : */
5526 :
5527 : /* Initialise stats to a sanish value */
5528 880 : MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
5529 880 : MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
5530 :
5531 : /* Load the libpq-specific functions */
5532 880 : load_file("libpqwalreceiver", false);
5533 :
5534 880 : InitializeLogRepWorker();
5535 :
5536 : /*
5537 : * Register a callback to reset the origin state before aborting any
5538 : * pending transaction during shutdown (see ShutdownPostgres()). This will
5539 : * avoid origin advancement for an in-complete transaction which could
5540 : * otherwise lead to its loss as such a transaction won't be sent by the
5541 : * server again.
5542 : *
5543 : * Note that even a LOG or DEBUG statement placed after setting the origin
5544 : * state may process a shutdown signal before committing the current apply
5545 : * operation. So, it is important to register such a callback here.
5546 : */
5547 872 : before_shmem_exit(replorigin_reset, (Datum) 0);
5548 :
5549 : /* Connect to the origin and start the replication. */
5550 872 : elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
5551 : MySubscription->conninfo);
5552 :
5553 : /*
5554 : * Setup callback for syscache so that we know when something changes in
5555 : * the subscription relation state.
5556 : */
5557 872 : CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
5558 : invalidate_syncing_table_states,
5559 : (Datum) 0);
5560 872 : }
5561 :
5562 : /* Logical Replication Apply worker entry point */
5563 : void
5564 490 : ApplyWorkerMain(Datum main_arg)
5565 : {
5566 490 : int worker_slot = DatumGetInt32(main_arg);
5567 :
5568 490 : InitializingApplyWorker = true;
5569 :
5570 490 : SetupApplyOrSyncWorker(worker_slot);
5571 :
5572 484 : InitializingApplyWorker = false;
5573 :
5574 484 : run_apply_worker();
5575 :
5576 0 : proc_exit(0);
5577 : }
5578 :
5579 : /*
5580 : * After error recovery, disable the subscription in a new transaction
5581 : * and exit cleanly.
5582 : */
5583 : void
5584 8 : DisableSubscriptionAndExit(void)
5585 : {
5586 : /*
5587 : * Emit the error message, and recover from the error state to an idle
5588 : * state
5589 : */
5590 8 : HOLD_INTERRUPTS();
5591 :
5592 8 : EmitErrorReport();
5593 8 : AbortOutOfAnyTransaction();
5594 8 : FlushErrorState();
5595 :
5596 8 : RESUME_INTERRUPTS();
5597 :
5598 : /* Report the worker failed during either table synchronization or apply */
5599 8 : pgstat_report_subscription_error(MyLogicalRepWorker->subid,
5600 8 : !am_tablesync_worker());
5601 :
5602 : /* Disable the subscription */
5603 8 : StartTransactionCommand();
5604 :
5605 : /*
5606 : * Updating pg_subscription might involve TOAST table access, so ensure we
5607 : * have a valid snapshot.
5608 : */
5609 8 : PushActiveSnapshot(GetTransactionSnapshot());
5610 :
5611 8 : DisableSubscription(MySubscription->oid);
5612 8 : PopActiveSnapshot();
5613 8 : CommitTransactionCommand();
5614 :
5615 : /* Ensure we remove no-longer-useful entry for worker's start time */
5616 8 : if (am_leader_apply_worker())
5617 6 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
5618 :
5619 : /* Notify the subscription has been disabled and exit */
5620 8 : ereport(LOG,
5621 : errmsg("subscription \"%s\" has been disabled because of an error",
5622 : MySubscription->name));
5623 :
5624 : /*
5625 : * Skip the track_commit_timestamp check when disabling the worker due to
5626 : * an error, as verifying commit timestamps is unnecessary in this
5627 : * context.
5628 : */
5629 8 : if (MySubscription->retaindeadtuples)
5630 0 : CheckSubDeadTupleRetention(false, true, WARNING);
5631 :
5632 8 : proc_exit(0);
5633 : }
5634 :
5635 : /*
5636 : * Is current process a logical replication worker?
5637 : */
5638 : bool
5639 3990 : IsLogicalWorker(void)
5640 : {
5641 3990 : return MyLogicalRepWorker != NULL;
5642 : }
5643 :
5644 : /*
5645 : * Is current process a logical replication parallel apply worker?
5646 : */
5647 : bool
5648 2748 : IsLogicalParallelApplyWorker(void)
5649 : {
5650 2748 : return IsLogicalWorker() && am_parallel_apply_worker();
5651 : }
5652 :
5653 : /*
5654 : * Start skipping changes of the transaction if the given LSN matches the
5655 : * LSN specified by subscription's skiplsn.
5656 : */
5657 : static void
5658 1022 : maybe_start_skipping_changes(XLogRecPtr finish_lsn)
5659 : {
5660 : Assert(!is_skipping_changes());
5661 : Assert(!in_remote_transaction);
5662 : Assert(!in_streamed_transaction);
5663 :
5664 : /*
5665 : * Quick return if it's not requested to skip this transaction. This
5666 : * function is called for every remote transaction and we assume that
5667 : * skipping the transaction is not used often.
5668 : */
5669 1022 : if (likely(XLogRecPtrIsInvalid(MySubscription->skiplsn) ||
5670 : MySubscription->skiplsn != finish_lsn))
5671 1016 : return;
5672 :
5673 : /* Start skipping all changes of this transaction */
5674 6 : skip_xact_finish_lsn = finish_lsn;
5675 :
5676 6 : ereport(LOG,
5677 : errmsg("logical replication starts skipping transaction at LSN %X/%08X",
5678 : LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
5679 : }
5680 :
5681 : /*
5682 : * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
5683 : */
5684 : static void
5685 54 : stop_skipping_changes(void)
5686 : {
5687 54 : if (!is_skipping_changes())
5688 48 : return;
5689 :
5690 6 : ereport(LOG,
5691 : errmsg("logical replication completed skipping transaction at LSN %X/%08X",
5692 : LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
5693 :
5694 : /* Stop skipping changes */
5695 6 : skip_xact_finish_lsn = InvalidXLogRecPtr;
5696 : }
5697 :
5698 : /*
5699 : * Clear subskiplsn of pg_subscription catalog.
5700 : *
5701 : * finish_lsn is the transaction's finish LSN that is used to check if the
5702 : * subskiplsn matches it. If not matched, we raise a warning when clearing the
5703 : * subskiplsn in order to inform users for cases e.g., where the user mistakenly
5704 : * specified the wrong subskiplsn.
5705 : */
5706 : static void
5707 1040 : clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
5708 : {
5709 : Relation rel;
5710 : Form_pg_subscription subform;
5711 : HeapTuple tup;
5712 1040 : XLogRecPtr myskiplsn = MySubscription->skiplsn;
5713 1040 : bool started_tx = false;
5714 :
5715 1040 : if (likely(XLogRecPtrIsInvalid(myskiplsn)) || am_parallel_apply_worker())
5716 1034 : return;
5717 :
5718 6 : if (!IsTransactionState())
5719 : {
5720 2 : StartTransactionCommand();
5721 2 : started_tx = true;
5722 : }
5723 :
5724 : /*
5725 : * Updating pg_subscription might involve TOAST table access, so ensure we
5726 : * have a valid snapshot.
5727 : */
5728 6 : PushActiveSnapshot(GetTransactionSnapshot());
5729 :
5730 : /*
5731 : * Protect subskiplsn of pg_subscription from being concurrently updated
5732 : * while clearing it.
5733 : */
5734 6 : LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
5735 : AccessShareLock);
5736 :
5737 6 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
5738 :
5739 : /* Fetch the existing tuple. */
5740 6 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
5741 : ObjectIdGetDatum(MySubscription->oid));
5742 :
5743 6 : if (!HeapTupleIsValid(tup))
5744 0 : elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
5745 :
5746 6 : subform = (Form_pg_subscription) GETSTRUCT(tup);
5747 :
5748 : /*
5749 : * Clear the subskiplsn. If the user has already changed subskiplsn before
5750 : * clearing it we don't update the catalog and the replication origin
5751 : * state won't get advanced. So in the worst case, if the server crashes
5752 : * before sending an acknowledgment of the flush position the transaction
5753 : * will be sent again and the user needs to set subskiplsn again. We can
5754 : * reduce the possibility by logging a replication origin WAL record to
5755 : * advance the origin LSN instead but there is no way to advance the
5756 : * origin timestamp and it doesn't seem to be worth doing anything about
5757 : * it since it's a very rare case.
5758 : */
5759 6 : if (subform->subskiplsn == myskiplsn)
5760 : {
5761 : bool nulls[Natts_pg_subscription];
5762 : bool replaces[Natts_pg_subscription];
5763 : Datum values[Natts_pg_subscription];
5764 :
5765 6 : memset(values, 0, sizeof(values));
5766 6 : memset(nulls, false, sizeof(nulls));
5767 6 : memset(replaces, false, sizeof(replaces));
5768 :
5769 : /* reset subskiplsn */
5770 6 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
5771 6 : replaces[Anum_pg_subscription_subskiplsn - 1] = true;
5772 :
5773 6 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
5774 : replaces);
5775 6 : CatalogTupleUpdate(rel, &tup->t_self, tup);
5776 :
5777 6 : if (myskiplsn != finish_lsn)
5778 0 : ereport(WARNING,
5779 : errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
5780 : errdetail("Remote transaction's finish WAL location (LSN) %X/%08X did not match skip-LSN %X/%08X.",
5781 : LSN_FORMAT_ARGS(finish_lsn),
5782 : LSN_FORMAT_ARGS(myskiplsn)));
5783 : }
5784 :
5785 6 : heap_freetuple(tup);
5786 6 : table_close(rel, NoLock);
5787 :
5788 6 : PopActiveSnapshot();
5789 :
5790 6 : if (started_tx)
5791 2 : CommitTransactionCommand();
5792 : }
5793 :
5794 : /* Error callback to give more context info about the change being applied */
5795 : void
5796 1538 : apply_error_callback(void *arg)
5797 : {
5798 1538 : ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
5799 :
5800 1538 : if (apply_error_callback_arg.command == 0)
5801 782 : return;
5802 :
5803 : Assert(errarg->origin_name);
5804 :
5805 756 : if (errarg->rel == NULL)
5806 : {
5807 646 : if (!TransactionIdIsValid(errarg->remote_xid))
5808 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
5809 : errarg->origin_name,
5810 : logicalrep_message_type(errarg->command));
5811 646 : else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
5812 532 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
5813 : errarg->origin_name,
5814 : logicalrep_message_type(errarg->command),
5815 : errarg->remote_xid);
5816 : else
5817 228 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%08X",
5818 : errarg->origin_name,
5819 : logicalrep_message_type(errarg->command),
5820 : errarg->remote_xid,
5821 114 : LSN_FORMAT_ARGS(errarg->finish_lsn));
5822 : }
5823 : else
5824 : {
5825 110 : if (errarg->remote_attnum < 0)
5826 : {
5827 110 : if (XLogRecPtrIsInvalid(errarg->finish_lsn))
5828 4 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
5829 : errarg->origin_name,
5830 : logicalrep_message_type(errarg->command),
5831 2 : errarg->rel->remoterel.nspname,
5832 2 : errarg->rel->remoterel.relname,
5833 : errarg->remote_xid);
5834 : else
5835 216 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%08X",
5836 : errarg->origin_name,
5837 : logicalrep_message_type(errarg->command),
5838 108 : errarg->rel->remoterel.nspname,
5839 108 : errarg->rel->remoterel.relname,
5840 : errarg->remote_xid,
5841 108 : LSN_FORMAT_ARGS(errarg->finish_lsn));
5842 : }
5843 : else
5844 : {
5845 0 : if (XLogRecPtrIsInvalid(errarg->finish_lsn))
5846 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
5847 : errarg->origin_name,
5848 : logicalrep_message_type(errarg->command),
5849 0 : errarg->rel->remoterel.nspname,
5850 0 : errarg->rel->remoterel.relname,
5851 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
5852 : errarg->remote_xid);
5853 : else
5854 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%08X",
5855 : errarg->origin_name,
5856 : logicalrep_message_type(errarg->command),
5857 0 : errarg->rel->remoterel.nspname,
5858 0 : errarg->rel->remoterel.relname,
5859 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
5860 : errarg->remote_xid,
5861 0 : LSN_FORMAT_ARGS(errarg->finish_lsn));
5862 : }
5863 : }
5864 : }
5865 :
5866 : /* Set transaction information of apply error callback */
5867 : static inline void
5868 5806 : set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
5869 : {
5870 5806 : apply_error_callback_arg.remote_xid = xid;
5871 5806 : apply_error_callback_arg.finish_lsn = lsn;
5872 5806 : }
5873 :
5874 : /* Reset all information of apply error callback */
5875 : static inline void
5876 2858 : reset_apply_error_context_info(void)
5877 : {
5878 2858 : apply_error_callback_arg.command = 0;
5879 2858 : apply_error_callback_arg.rel = NULL;
5880 2858 : apply_error_callback_arg.remote_attnum = -1;
5881 2858 : set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
5882 2858 : }
5883 :
5884 : /*
5885 : * Request wakeup of the workers for the given subscription OID
5886 : * at commit of the current transaction.
5887 : *
5888 : * This is used to ensure that the workers process assorted changes
5889 : * as soon as possible.
5890 : */
5891 : void
5892 410 : LogicalRepWorkersWakeupAtCommit(Oid subid)
5893 : {
5894 : MemoryContext oldcxt;
5895 :
5896 410 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
5897 410 : on_commit_wakeup_workers_subids =
5898 410 : list_append_unique_oid(on_commit_wakeup_workers_subids, subid);
5899 410 : MemoryContextSwitchTo(oldcxt);
5900 410 : }
5901 :
5902 : /*
5903 : * Wake up the workers of any subscriptions that were changed in this xact.
5904 : */
5905 : void
5906 1054898 : AtEOXact_LogicalRepWorkers(bool isCommit)
5907 : {
5908 1054898 : if (isCommit && on_commit_wakeup_workers_subids != NIL)
5909 : {
5910 : ListCell *lc;
5911 :
5912 400 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
5913 800 : foreach(lc, on_commit_wakeup_workers_subids)
5914 : {
5915 400 : Oid subid = lfirst_oid(lc);
5916 : List *workers;
5917 : ListCell *lc2;
5918 :
5919 400 : workers = logicalrep_workers_find(subid, true, false);
5920 516 : foreach(lc2, workers)
5921 : {
5922 116 : LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
5923 :
5924 116 : logicalrep_worker_wakeup_ptr(worker);
5925 : }
5926 : }
5927 400 : LWLockRelease(LogicalRepWorkerLock);
5928 : }
5929 :
5930 : /* The List storage will be reclaimed automatically in xact cleanup. */
5931 1054898 : on_commit_wakeup_workers_subids = NIL;
5932 1054898 : }
5933 :
5934 : /*
5935 : * Allocate the origin name in long-lived context for error context message.
5936 : */
5937 : void
5938 798 : set_apply_error_context_origin(char *originname)
5939 : {
5940 798 : apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
5941 : originname);
5942 798 : }
5943 :
5944 : /*
5945 : * Return the action to be taken for the given transaction. See
5946 : * TransApplyAction for information on each of the actions.
5947 : *
5948 : * *winfo is assigned to the destination parallel worker info when the leader
5949 : * apply worker has to pass all the transaction's changes to the parallel
5950 : * apply worker.
5951 : */
5952 : static TransApplyAction
5953 652480 : get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
5954 : {
5955 652480 : *winfo = NULL;
5956 :
5957 652480 : if (am_parallel_apply_worker())
5958 : {
5959 137856 : return TRANS_PARALLEL_APPLY;
5960 : }
5961 :
5962 : /*
5963 : * If we are processing this transaction using a parallel apply worker
5964 : * then either we send the changes to the parallel worker or if the worker
5965 : * is busy then serialize the changes to the file which will later be
5966 : * processed by the parallel worker.
5967 : */
5968 514624 : *winfo = pa_find_worker(xid);
5969 :
5970 514624 : if (*winfo && (*winfo)->serialize_changes)
5971 : {
5972 10074 : return TRANS_LEADER_PARTIAL_SERIALIZE;
5973 : }
5974 504550 : else if (*winfo)
5975 : {
5976 137824 : return TRANS_LEADER_SEND_TO_PARALLEL;
5977 : }
5978 :
5979 : /*
5980 : * If there is no parallel worker involved to process this transaction
5981 : * then we either directly apply the change or serialize it to a file
5982 : * which will later be applied when the transaction finish message is
5983 : * processed.
5984 : */
5985 366726 : else if (in_streamed_transaction)
5986 : {
5987 206398 : return TRANS_LEADER_SERIALIZE;
5988 : }
5989 : else
5990 : {
5991 160328 : return TRANS_LEADER_APPLY;
5992 : }
5993 : }
|