Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * worker.c
3 : * PostgreSQL logical replication worker (apply)
4 : *
5 : * Copyright (c) 2016-2025, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/worker.c
9 : *
10 : * NOTES
11 : * This file contains the worker which applies logical changes as they come
12 : * from remote logical replication stream.
13 : *
14 : * The main worker (apply) is started by logical replication worker
15 : * launcher for every enabled subscription in a database. It uses
16 : * walsender protocol to communicate with publisher.
17 : *
18 : * This module includes server facing code and shares libpqwalreceiver
19 : * module with walreceiver for providing the libpq specific functionality.
20 : *
21 : *
22 : * STREAMED TRANSACTIONS
23 : * ---------------------
24 : * Streamed transactions (large transactions exceeding a memory limit on the
25 : * upstream) are applied using one of two approaches:
26 : *
27 : * 1) Write to temporary files and apply when the final commit arrives
28 : *
29 : * This approach is used when the user has set the subscription's streaming
30 : * option as on.
31 : *
32 : * Unlike the regular (non-streamed) case, handling streamed transactions has
33 : * to handle aborts of both the toplevel transaction and subtransactions. This
34 : * is achieved by tracking offsets for subtransactions, which is then used
35 : * to truncate the file with serialized changes.
36 : *
37 : * The files are placed in tmp file directory by default, and the filenames
38 : * include both the XID of the toplevel transaction and OID of the
39 : * subscription. This is necessary so that different workers processing a
40 : * remote transaction with the same XID doesn't interfere.
41 : *
42 : * We use BufFiles instead of using normal temporary files because (a) the
43 : * BufFile infrastructure supports temporary files that exceed the OS file size
44 : * limit, (b) provides a way for automatic clean up on the error and (c) provides
45 : * a way to survive these files across local transactions and allow to open and
46 : * close at stream start and close. We decided to use FileSet
47 : * infrastructure as without that it deletes the files on the closure of the
48 : * file and if we decide to keep stream files open across the start/stop stream
49 : * then it will consume a lot of memory (more than 8K for each BufFile and
50 : * there could be multiple such BufFiles as the subscriber could receive
51 : * multiple start/stop streams for different transactions before getting the
52 : * commit). Moreover, if we don't use FileSet then we also need to invent
53 : * a new way to pass filenames to BufFile APIs so that we are allowed to open
54 : * the file we desired across multiple stream-open calls for the same
55 : * transaction.
56 : *
57 : * 2) Parallel apply workers.
58 : *
59 : * This approach is used when the user has set the subscription's streaming
60 : * option as parallel. See logical/applyparallelworker.c for information about
61 : * this approach.
62 : *
63 : * TWO_PHASE TRANSACTIONS
64 : * ----------------------
65 : * Two phase transactions are replayed at prepare and then committed or
66 : * rolled back at commit prepared and rollback prepared respectively. It is
67 : * possible to have a prepared transaction that arrives at the apply worker
68 : * when the tablesync is busy doing the initial copy. In this case, the apply
69 : * worker skips all the prepared operations [e.g. inserts] while the tablesync
70 : * is still busy (see the condition of should_apply_changes_for_rel). The
71 : * tablesync worker might not get such a prepared transaction because say it
72 : * was prior to the initial consistent point but might have got some later
73 : * commits. Now, the tablesync worker will exit without doing anything for the
74 : * prepared transaction skipped by the apply worker as the sync location for it
75 : * will be already ahead of the apply worker's current location. This would lead
76 : * to an "empty prepare", because later when the apply worker does the commit
77 : * prepare, there is nothing in it (the inserts were skipped earlier).
78 : *
79 : * To avoid this, and similar prepare confusions the subscription's two_phase
80 : * commit is enabled only after the initial sync is over. The two_phase option
81 : * has been implemented as a tri-state with values DISABLED, PENDING, and
82 : * ENABLED.
83 : *
84 : * Even if the user specifies they want a subscription with two_phase = on,
85 : * internally it will start with a tri-state of PENDING which only becomes
86 : * ENABLED after all tablesync initializations are completed - i.e. when all
87 : * tablesync workers have reached their READY state. In other words, the value
88 : * PENDING is only a temporary state for subscription start-up.
89 : *
90 : * Until the two_phase is properly available (ENABLED) the subscription will
91 : * behave as if two_phase = off. When the apply worker detects that all
92 : * tablesyncs have become READY (while the tri-state was PENDING) it will
93 : * restart the apply worker process. This happens in
94 : * process_syncing_tables_for_apply.
95 : *
96 : * When the (re-started) apply worker finds that all tablesyncs are READY for a
97 : * two_phase tri-state of PENDING it start streaming messages with the
98 : * two_phase option which in turn enables the decoding of two-phase commits at
99 : * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
100 : * Now, it is possible that during the time we have not enabled two_phase, the
101 : * publisher (replication server) would have skipped some prepares but we
102 : * ensure that such prepares are sent along with commit prepare, see
103 : * ReorderBufferFinishPrepared.
104 : *
105 : * If the subscription has no tables then a two_phase tri-state PENDING is
106 : * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
107 : * PUBLICATION which might otherwise be disallowed (see below).
108 : *
109 : * If ever a user needs to be aware of the tri-state value, they can fetch it
110 : * from the pg_subscription catalog (see column subtwophasestate).
111 : *
112 : * Finally, to avoid problems mentioned in previous paragraphs from any
113 : * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
114 : * to 'off' and then again back to 'on') there is a restriction for
115 : * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
116 : * the two_phase tri-state is ENABLED, except when copy_data = false.
117 : *
118 : * We can get prepare of the same GID more than once for the genuine cases
119 : * where we have defined multiple subscriptions for publications on the same
120 : * server and prepared transaction has operations on tables subscribed to those
121 : * subscriptions. For such cases, if we use the GID sent by publisher one of
122 : * the prepares will be successful and others will fail, in which case the
123 : * server will send them again. Now, this can lead to a deadlock if user has
124 : * set synchronous_standby_names for all the subscriptions on subscriber. To
125 : * avoid such deadlocks, we generate a unique GID (consisting of the
126 : * subscription oid and the xid of the prepared transaction) for each prepare
127 : * transaction on the subscriber.
128 : *
129 : * FAILOVER
130 : * ----------------------
131 : * The logical slot on the primary can be synced to the standby by specifying
132 : * failover = true when creating the subscription. Enabling failover allows us
133 : * to smoothly transition to the promoted standby, ensuring that we can
134 : * subscribe to the new primary without losing any data.
135 : *
136 : * RETAIN DEAD TUPLES
137 : * ----------------------
138 : * Each apply worker that enabled retain_dead_tuples option maintains a
139 : * non-removable transaction ID (oldest_nonremovable_xid) in shared memory to
140 : * prevent dead rows from being removed prematurely when the apply worker still
141 : * needs them to detect conflicts reliably. This helps to retain the required
142 : * commit_ts module information, which further helps to detect
143 : * update_origin_differs and delete_origin_differs conflicts reliably, as
144 : * otherwise, vacuum freeze could remove the required information.
145 : *
146 : * The logical replication launcher manages an internal replication slot named
147 : * "pg_conflict_detection". It asynchronously aggregates the non-removable
148 : * transaction ID from all apply workers to determine the appropriate xmin for
149 : * the slot, thereby retaining necessary tuples.
150 : *
151 : * The non-removable transaction ID in the apply worker is advanced to the
152 : * oldest running transaction ID once all concurrent transactions on the
153 : * publisher have been applied and flushed locally. The process involves:
154 : *
155 : * - RDT_GET_CANDIDATE_XID:
156 : * Call GetOldestActiveTransactionId() to take oldestRunningXid as the
157 : * candidate xid.
158 : *
159 : * - RDT_REQUEST_PUBLISHER_STATUS:
160 : * Send a message to the walsender requesting the publisher status, which
161 : * includes the latest WAL write position and information about transactions
162 : * that are in the commit phase.
163 : *
164 : * - RDT_WAIT_FOR_PUBLISHER_STATUS:
165 : * Wait for the status from the walsender. After receiving the first status,
166 : * do not proceed if there are concurrent remote transactions that are still
167 : * in the commit phase. These transactions might have been assigned an
168 : * earlier commit timestamp but have not yet written the commit WAL record.
169 : * Continue to request the publisher status (RDT_REQUEST_PUBLISHER_STATUS)
170 : * until all these transactions have completed.
171 : *
172 : * - RDT_WAIT_FOR_LOCAL_FLUSH:
173 : * Advance the non-removable transaction ID if the current flush location has
174 : * reached or surpassed the last received WAL position.
175 : *
176 : * The overall state progression is: GET_CANDIDATE_XID ->
177 : * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
178 : * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
179 : * WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID.
180 : *
181 : * Retaining the dead tuples for this period is sufficient for ensuring
182 : * eventual consistency using last-update-wins strategy, as dead tuples are
183 : * useful for detecting conflicts only during the application of concurrent
184 : * transactions from remote nodes. After applying and flushing all remote
185 : * transactions that occurred concurrently with the tuple DELETE, any
186 : * subsequent UPDATE from a remote node should have a later timestamp. In such
187 : * cases, it is acceptable to detect an update_missing scenario and convert the
188 : * UPDATE to an INSERT when applying it. But, detecting concurrent remote
189 : * transactions with earlier timestamps than the DELETE is necessary, as the
190 : * UPDATEs in remote transactions should be ignored if their timestamp is
191 : * earlier than that of the dead tuples.
192 : *
193 : * Note that advancing the non-removable transaction ID is not supported if the
194 : * publisher is also a physical standby. This is because the logical walsender
195 : * on the standby can only get the WAL replay position but there may be more
196 : * WALs that are being replicated from the primary and those WALs could have
197 : * earlier commit timestamp.
198 : *
199 : * Similarly, when the publisher has subscribed to another publisher,
200 : * information necessary for conflict detection cannot be retained for
201 : * changes from origins other than the publisher. This is because publisher
202 : * lacks the information on concurrent transactions of other publishers to
203 : * which it subscribes. As the information on concurrent transactions is
204 : * unavailable beyond subscriber's immediate publishers, the non-removable
205 : * transaction ID might be advanced prematurely before changes from other
206 : * origins have been fully applied.
207 : *
208 : * XXX Retaining information for changes from other origins might be possible
209 : * by requesting the subscription on that origin to enable retain_dead_tuples
210 : * and fetching the conflict detection slot.xmin along with the publisher's
211 : * status. In the RDT_WAIT_FOR_PUBLISHER_STATUS phase, the apply worker could
212 : * wait for the remote slot's xmin to reach the oldest active transaction ID,
213 : * ensuring that all transactions from other origins have been applied on the
214 : * publisher, thereby getting the latest WAL position that includes all
215 : * concurrent changes. However, this approach may impact performance, so it
216 : * might not worth the effort.
217 : *
218 : * XXX It seems feasible to get the latest commit's WAL location from the
219 : * publisher and wait till that is applied. However, we can't do that
220 : * because commit timestamps can regress as a commit with a later LSN is not
221 : * guaranteed to have a later timestamp than those with earlier LSNs. Having
222 : * said that, even if that is possible, it won't improve performance much as
223 : * the apply always lag and moves slowly as compared with the transactions
224 : * on the publisher.
225 : *-------------------------------------------------------------------------
226 : */
227 :
228 : #include "postgres.h"
229 :
230 : #include <sys/stat.h>
231 : #include <unistd.h>
232 :
233 : #include "access/commit_ts.h"
234 : #include "access/table.h"
235 : #include "access/tableam.h"
236 : #include "access/twophase.h"
237 : #include "access/xact.h"
238 : #include "catalog/indexing.h"
239 : #include "catalog/pg_inherits.h"
240 : #include "catalog/pg_subscription.h"
241 : #include "catalog/pg_subscription_rel.h"
242 : #include "commands/subscriptioncmds.h"
243 : #include "commands/tablecmds.h"
244 : #include "commands/trigger.h"
245 : #include "executor/executor.h"
246 : #include "executor/execPartition.h"
247 : #include "libpq/pqformat.h"
248 : #include "miscadmin.h"
249 : #include "optimizer/optimizer.h"
250 : #include "parser/parse_relation.h"
251 : #include "pgstat.h"
252 : #include "postmaster/bgworker.h"
253 : #include "postmaster/interrupt.h"
254 : #include "postmaster/walwriter.h"
255 : #include "replication/conflict.h"
256 : #include "replication/logicallauncher.h"
257 : #include "replication/logicalproto.h"
258 : #include "replication/logicalrelation.h"
259 : #include "replication/logicalworker.h"
260 : #include "replication/origin.h"
261 : #include "replication/slot.h"
262 : #include "replication/walreceiver.h"
263 : #include "replication/worker_internal.h"
264 : #include "rewrite/rewriteHandler.h"
265 : #include "storage/buffile.h"
266 : #include "storage/ipc.h"
267 : #include "storage/lmgr.h"
268 : #include "storage/procarray.h"
269 : #include "tcop/tcopprot.h"
270 : #include "utils/acl.h"
271 : #include "utils/dynahash.h"
272 : #include "utils/guc.h"
273 : #include "utils/inval.h"
274 : #include "utils/lsyscache.h"
275 : #include "utils/memutils.h"
276 : #include "utils/pg_lsn.h"
277 : #include "utils/rel.h"
278 : #include "utils/rls.h"
279 : #include "utils/snapmgr.h"
280 : #include "utils/syscache.h"
281 : #include "utils/usercontext.h"
282 :
283 : #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
284 :
285 : typedef struct FlushPosition
286 : {
287 : dlist_node node;
288 : XLogRecPtr local_end;
289 : XLogRecPtr remote_end;
290 : } FlushPosition;
291 :
292 : static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
293 :
294 : typedef struct ApplyExecutionData
295 : {
296 : EState *estate; /* executor state, used to track resources */
297 :
298 : LogicalRepRelMapEntry *targetRel; /* replication target rel */
299 : ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
300 :
301 : /* These fields are used when the target relation is partitioned: */
302 : ModifyTableState *mtstate; /* dummy ModifyTable state */
303 : PartitionTupleRouting *proute; /* partition routing info */
304 : } ApplyExecutionData;
305 :
306 : /* Struct for saving and restoring apply errcontext information */
307 : typedef struct ApplyErrorCallbackArg
308 : {
309 : LogicalRepMsgType command; /* 0 if invalid */
310 : LogicalRepRelMapEntry *rel;
311 :
312 : /* Remote node information */
313 : int remote_attnum; /* -1 if invalid */
314 : TransactionId remote_xid;
315 : XLogRecPtr finish_lsn;
316 : char *origin_name;
317 : } ApplyErrorCallbackArg;
318 :
319 : /*
320 : * The action to be taken for the changes in the transaction.
321 : *
322 : * TRANS_LEADER_APPLY:
323 : * This action means that we are in the leader apply worker or table sync
324 : * worker. The changes of the transaction are either directly applied or
325 : * are read from temporary files (for streaming transactions) and then
326 : * applied by the worker.
327 : *
328 : * TRANS_LEADER_SERIALIZE:
329 : * This action means that we are in the leader apply worker or table sync
330 : * worker. Changes are written to temporary files and then applied when the
331 : * final commit arrives.
332 : *
333 : * TRANS_LEADER_SEND_TO_PARALLEL:
334 : * This action means that we are in the leader apply worker and need to send
335 : * the changes to the parallel apply worker.
336 : *
337 : * TRANS_LEADER_PARTIAL_SERIALIZE:
338 : * This action means that we are in the leader apply worker and have sent some
339 : * changes directly to the parallel apply worker and the remaining changes are
340 : * serialized to a file, due to timeout while sending data. The parallel apply
341 : * worker will apply these serialized changes when the final commit arrives.
342 : *
343 : * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
344 : * serializing changes, the leader worker also needs to serialize the
345 : * STREAM_XXX message to a file, and wait for the parallel apply worker to
346 : * finish the transaction when processing the transaction finish command. So
347 : * this new action was introduced to keep the code and logic clear.
348 : *
349 : * TRANS_PARALLEL_APPLY:
350 : * This action means that we are in the parallel apply worker and changes of
351 : * the transaction are applied directly by the worker.
352 : */
353 : typedef enum
354 : {
355 : /* The action for non-streaming transactions. */
356 : TRANS_LEADER_APPLY,
357 :
358 : /* Actions for streaming transactions. */
359 : TRANS_LEADER_SERIALIZE,
360 : TRANS_LEADER_SEND_TO_PARALLEL,
361 : TRANS_LEADER_PARTIAL_SERIALIZE,
362 : TRANS_PARALLEL_APPLY,
363 : } TransApplyAction;
364 :
365 : /*
366 : * The phases involved in advancing the non-removable transaction ID.
367 : *
368 : * See comments atop worker.c for details of the transition between these
369 : * phases.
370 : */
371 : typedef enum
372 : {
373 : RDT_GET_CANDIDATE_XID,
374 : RDT_REQUEST_PUBLISHER_STATUS,
375 : RDT_WAIT_FOR_PUBLISHER_STATUS,
376 : RDT_WAIT_FOR_LOCAL_FLUSH
377 : } RetainDeadTuplesPhase;
378 :
379 : /*
380 : * Critical information for managing phase transitions within the
381 : * RetainDeadTuplesPhase.
382 : */
383 : typedef struct RetainDeadTuplesData
384 : {
385 : RetainDeadTuplesPhase phase; /* current phase */
386 : XLogRecPtr remote_lsn; /* WAL write position on the publisher */
387 :
388 : /*
389 : * Oldest transaction ID that was in the commit phase on the publisher.
390 : * Use FullTransactionId to prevent issues with transaction ID wraparound,
391 : * where a new remote_oldestxid could falsely appear to originate from the
392 : * past and block advancement.
393 : */
394 : FullTransactionId remote_oldestxid;
395 :
396 : /*
397 : * Next transaction ID to be assigned on the publisher. Use
398 : * FullTransactionId for consistency and to allow straightforward
399 : * comparisons with remote_oldestxid.
400 : */
401 : FullTransactionId remote_nextxid;
402 :
403 : TimestampTz reply_time; /* when the publisher responds with status */
404 :
405 : /*
406 : * Publisher transaction ID that must be awaited to complete before
407 : * entering the final phase (RDT_WAIT_FOR_LOCAL_FLUSH). Use
408 : * FullTransactionId for the same reason as remote_nextxid.
409 : */
410 : FullTransactionId remote_wait_for;
411 :
412 : TransactionId candidate_xid; /* candidate for the non-removable
413 : * transaction ID */
414 : TimestampTz flushpos_update_time; /* when the remote flush position was
415 : * updated in final phase
416 : * (RDT_WAIT_FOR_LOCAL_FLUSH) */
417 :
418 : /*
419 : * The following fields are used to determine the timing for the next
420 : * round of transaction ID advancement.
421 : */
422 : TimestampTz last_recv_time; /* when the last message was received */
423 : TimestampTz candidate_xid_time; /* when the candidate_xid is decided */
424 : int xid_advance_interval; /* how much time (ms) to wait before
425 : * attempting to advance the
426 : * non-removable transaction ID */
427 : } RetainDeadTuplesData;
428 :
429 : /*
430 : * The minimum (100ms) and maximum (3 minutes) intervals for advancing
431 : * non-removable transaction IDs. The maximum interval is a bit arbitrary but
432 : * is sufficient to not cause any undue network traffic.
433 : */
434 : #define MIN_XID_ADVANCE_INTERVAL 100
435 : #define MAX_XID_ADVANCE_INTERVAL 180000
436 :
437 : /* errcontext tracker */
438 : static ApplyErrorCallbackArg apply_error_callback_arg =
439 : {
440 : .command = 0,
441 : .rel = NULL,
442 : .remote_attnum = -1,
443 : .remote_xid = InvalidTransactionId,
444 : .finish_lsn = InvalidXLogRecPtr,
445 : .origin_name = NULL,
446 : };
447 :
448 : ErrorContextCallback *apply_error_context_stack = NULL;
449 :
450 : MemoryContext ApplyMessageContext = NULL;
451 : MemoryContext ApplyContext = NULL;
452 :
453 : /* per stream context for streaming transactions */
454 : static MemoryContext LogicalStreamingContext = NULL;
455 :
456 : WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
457 :
458 : Subscription *MySubscription = NULL;
459 : static bool MySubscriptionValid = false;
460 :
461 : static List *on_commit_wakeup_workers_subids = NIL;
462 :
463 : bool in_remote_transaction = false;
464 : static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
465 :
466 : /* fields valid only when processing streamed transaction */
467 : static bool in_streamed_transaction = false;
468 :
469 : static TransactionId stream_xid = InvalidTransactionId;
470 :
471 : /*
472 : * The number of changes applied by parallel apply worker during one streaming
473 : * block.
474 : */
475 : static uint32 parallel_stream_nchanges = 0;
476 :
477 : /* Are we initializing an apply worker? */
478 : bool InitializingApplyWorker = false;
479 :
480 : /*
481 : * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
482 : * the subscription if the remote transaction's finish LSN matches the subskiplsn.
483 : * Once we start skipping changes, we don't stop it until we skip all changes of
484 : * the transaction even if pg_subscription is updated and MySubscription->skiplsn
485 : * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
486 : * we don't skip receiving and spooling the changes since we decide whether or not
487 : * to skip applying the changes when starting to apply changes. The subskiplsn is
488 : * cleared after successfully skipping the transaction or applying non-empty
489 : * transaction. The latter prevents the mistakenly specified subskiplsn from
490 : * being left. Note that we cannot skip the streaming transactions when using
491 : * parallel apply workers because we cannot get the finish LSN before applying
492 : * the changes. So, we don't start parallel apply worker when finish LSN is set
493 : * by the user.
494 : */
495 : static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
496 : #define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
497 :
498 : /* BufFile handle of the current streaming file */
499 : static BufFile *stream_fd = NULL;
500 :
501 : /*
502 : * The remote WAL position that has been applied and flushed locally. We record
503 : * and use this information both while sending feedback to the server and
504 : * advancing oldest_nonremovable_xid.
505 : */
506 : static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
507 :
508 : typedef struct SubXactInfo
509 : {
510 : TransactionId xid; /* XID of the subxact */
511 : int fileno; /* file number in the buffile */
512 : off_t offset; /* offset in the file */
513 : } SubXactInfo;
514 :
515 : /* Sub-transaction data for the current streaming transaction */
516 : typedef struct ApplySubXactData
517 : {
518 : uint32 nsubxacts; /* number of sub-transactions */
519 : uint32 nsubxacts_max; /* current capacity of subxacts */
520 : TransactionId subxact_last; /* xid of the last sub-transaction */
521 : SubXactInfo *subxacts; /* sub-xact offset in changes file */
522 : } ApplySubXactData;
523 :
524 : static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
525 :
526 : static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
527 : static inline void changes_filename(char *path, Oid subid, TransactionId xid);
528 :
529 : /*
530 : * Information about subtransactions of a given toplevel transaction.
531 : */
532 : static void subxact_info_write(Oid subid, TransactionId xid);
533 : static void subxact_info_read(Oid subid, TransactionId xid);
534 : static void subxact_info_add(TransactionId xid);
535 : static inline void cleanup_subxact_info(void);
536 :
537 : /*
538 : * Serialize and deserialize changes for a toplevel transaction.
539 : */
540 : static void stream_open_file(Oid subid, TransactionId xid,
541 : bool first_segment);
542 : static void stream_write_change(char action, StringInfo s);
543 : static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
544 : static void stream_close_file(void);
545 :
546 : static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
547 :
548 : static void maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
549 : bool status_received);
550 : static bool can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data);
551 : static void process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
552 : bool status_received);
553 : static void get_candidate_xid(RetainDeadTuplesData *rdt_data);
554 : static void request_publisher_status(RetainDeadTuplesData *rdt_data);
555 : static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
556 : bool status_received);
557 : static void wait_for_local_flush(RetainDeadTuplesData *rdt_data);
558 : static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data,
559 : bool new_xid_found);
560 :
561 : static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
562 : static void apply_handle_insert_internal(ApplyExecutionData *edata,
563 : ResultRelInfo *relinfo,
564 : TupleTableSlot *remoteslot);
565 : static void apply_handle_update_internal(ApplyExecutionData *edata,
566 : ResultRelInfo *relinfo,
567 : TupleTableSlot *remoteslot,
568 : LogicalRepTupleData *newtup,
569 : Oid localindexoid);
570 : static void apply_handle_delete_internal(ApplyExecutionData *edata,
571 : ResultRelInfo *relinfo,
572 : TupleTableSlot *remoteslot,
573 : Oid localindexoid);
574 : static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
575 : LogicalRepRelation *remoterel,
576 : Oid localidxoid,
577 : TupleTableSlot *remoteslot,
578 : TupleTableSlot **localslot);
579 : static void apply_handle_tuple_routing(ApplyExecutionData *edata,
580 : TupleTableSlot *remoteslot,
581 : LogicalRepTupleData *newtup,
582 : CmdType operation);
583 :
584 : /* Functions for skipping changes */
585 : static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
586 : static void stop_skipping_changes(void);
587 : static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
588 :
589 : /* Functions for apply error callback */
590 : static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
591 : static inline void reset_apply_error_context_info(void);
592 :
593 : static TransApplyAction get_transaction_apply_action(TransactionId xid,
594 : ParallelApplyWorkerInfo **winfo);
595 :
596 : static void replorigin_reset(int code, Datum arg);
597 :
598 : /*
599 : * Form the origin name for the subscription.
600 : *
601 : * This is a common function for tablesync and other workers. Tablesync workers
602 : * must pass a valid relid. Other callers must pass relid = InvalidOid.
603 : *
604 : * Return the name in the supplied buffer.
605 : */
606 : void
607 2546 : ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
608 : char *originname, Size szoriginname)
609 : {
610 2546 : if (OidIsValid(relid))
611 : {
612 : /* Replication origin name for tablesync workers. */
613 1490 : snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
614 : }
615 : else
616 : {
617 : /* Replication origin name for non-tablesync workers. */
618 1056 : snprintf(originname, szoriginname, "pg_%u", suboid);
619 : }
620 2546 : }
621 :
622 : /*
623 : * Should this worker apply changes for given relation.
624 : *
625 : * This is mainly needed for initial relation data sync as that runs in
626 : * separate worker process running in parallel and we need some way to skip
627 : * changes coming to the leader apply worker during the sync of a table.
628 : *
629 : * Note we need to do smaller or equals comparison for SYNCDONE state because
630 : * it might hold position of end of initial slot consistent point WAL
631 : * record + 1 (ie start of next record) and next record can be COMMIT of
632 : * transaction we are now processing (which is what we set remote_final_lsn
633 : * to in apply_handle_begin).
634 : *
635 : * Note that for streaming transactions that are being applied in the parallel
636 : * apply worker, we disallow applying changes if the target table in the
637 : * subscription is not in the READY state, because we cannot decide whether to
638 : * apply the change as we won't know remote_final_lsn by that time.
639 : *
640 : * We already checked this in pa_can_start() before assigning the
641 : * streaming transaction to the parallel worker, but it also needs to be
642 : * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
643 : * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
644 : * while applying this transaction.
645 : */
646 : static bool
647 296234 : should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
648 : {
649 296234 : switch (MyLogicalRepWorker->type)
650 : {
651 20 : case WORKERTYPE_TABLESYNC:
652 20 : return MyLogicalRepWorker->relid == rel->localreloid;
653 :
654 136730 : case WORKERTYPE_PARALLEL_APPLY:
655 : /* We don't synchronize rel's that are in unknown state. */
656 136730 : if (rel->state != SUBREL_STATE_READY &&
657 0 : rel->state != SUBREL_STATE_UNKNOWN)
658 0 : ereport(ERROR,
659 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
660 : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
661 : MySubscription->name),
662 : errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
663 :
664 136730 : return rel->state == SUBREL_STATE_READY;
665 :
666 159484 : case WORKERTYPE_APPLY:
667 159628 : return (rel->state == SUBREL_STATE_READY ||
668 144 : (rel->state == SUBREL_STATE_SYNCDONE &&
669 22 : rel->statelsn <= remote_final_lsn));
670 :
671 0 : case WORKERTYPE_UNKNOWN:
672 : /* Should never happen. */
673 0 : elog(ERROR, "Unknown worker type");
674 : }
675 :
676 0 : return false; /* dummy for compiler */
677 : }
678 :
679 : /*
680 : * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
681 : *
682 : * Start a transaction, if this is the first step (else we keep using the
683 : * existing transaction).
684 : * Also provide a global snapshot and ensure we run in ApplyMessageContext.
685 : */
686 : static void
687 297138 : begin_replication_step(void)
688 : {
689 297138 : SetCurrentStatementStartTimestamp();
690 :
691 297138 : if (!IsTransactionState())
692 : {
693 1870 : StartTransactionCommand();
694 1870 : maybe_reread_subscription();
695 : }
696 :
697 297134 : PushActiveSnapshot(GetTransactionSnapshot());
698 :
699 297134 : MemoryContextSwitchTo(ApplyMessageContext);
700 297134 : }
701 :
702 : /*
703 : * Finish up one step of a replication transaction.
704 : * Callers of begin_replication_step() must also call this.
705 : *
706 : * We don't close out the transaction here, but we should increment
707 : * the command counter to make the effects of this step visible.
708 : */
709 : static void
710 297066 : end_replication_step(void)
711 : {
712 297066 : PopActiveSnapshot();
713 :
714 297066 : CommandCounterIncrement();
715 297066 : }
716 :
717 : /*
718 : * Handle streamed transactions for both the leader apply worker and the
719 : * parallel apply workers.
720 : *
721 : * In the streaming case (receiving a block of the streamed transaction), for
722 : * serialize mode, simply redirect it to a file for the proper toplevel
723 : * transaction, and for parallel mode, the leader apply worker will send the
724 : * changes to parallel apply workers and the parallel apply worker will define
725 : * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
726 : * messages will be applied by both leader apply worker and parallel apply
727 : * workers).
728 : *
729 : * Returns true for streamed transactions (when the change is either serialized
730 : * to file or sent to parallel apply worker), false otherwise (regular mode or
731 : * needs to be processed by parallel apply worker).
732 : *
733 : * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
734 : * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
735 : * to a parallel apply worker.
736 : */
737 : static bool
738 648832 : handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
739 : {
740 : TransactionId current_xid;
741 : ParallelApplyWorkerInfo *winfo;
742 : TransApplyAction apply_action;
743 : StringInfoData original_msg;
744 :
745 648832 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
746 :
747 : /* not in streaming mode */
748 648832 : if (apply_action == TRANS_LEADER_APPLY)
749 160238 : return false;
750 :
751 : Assert(TransactionIdIsValid(stream_xid));
752 :
753 : /*
754 : * The parallel apply worker needs the xid in this message to decide
755 : * whether to define a savepoint, so save the original message that has
756 : * not moved the cursor after the xid. We will serialize this message to a
757 : * file in PARTIAL_SERIALIZE mode.
758 : */
759 488594 : original_msg = *s;
760 :
761 : /*
762 : * We should have received XID of the subxact as the first part of the
763 : * message, so extract it.
764 : */
765 488594 : current_xid = pq_getmsgint(s, 4);
766 :
767 488594 : if (!TransactionIdIsValid(current_xid))
768 0 : ereport(ERROR,
769 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
770 : errmsg_internal("invalid transaction ID in streamed replication transaction")));
771 :
772 488594 : switch (apply_action)
773 : {
774 205024 : case TRANS_LEADER_SERIALIZE:
775 : Assert(stream_fd);
776 :
777 : /* Add the new subxact to the array (unless already there). */
778 205024 : subxact_info_add(current_xid);
779 :
780 : /* Write the change to the current file */
781 205024 : stream_write_change(action, s);
782 205024 : return true;
783 :
784 136772 : case TRANS_LEADER_SEND_TO_PARALLEL:
785 : Assert(winfo);
786 :
787 : /*
788 : * XXX The publisher side doesn't always send relation/type update
789 : * messages after the streaming transaction, so also update the
790 : * relation/type in leader apply worker. See function
791 : * cleanup_rel_sync_cache.
792 : */
793 136772 : if (pa_send_data(winfo, s->len, s->data))
794 136772 : return (action != LOGICAL_REP_MSG_RELATION &&
795 : action != LOGICAL_REP_MSG_TYPE);
796 :
797 : /*
798 : * Switch to serialize mode when we are not able to send the
799 : * change to parallel apply worker.
800 : */
801 0 : pa_switch_to_partial_serialize(winfo, false);
802 :
803 : /* fall through */
804 10012 : case TRANS_LEADER_PARTIAL_SERIALIZE:
805 10012 : stream_write_change(action, &original_msg);
806 :
807 : /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
808 10012 : return (action != LOGICAL_REP_MSG_RELATION &&
809 : action != LOGICAL_REP_MSG_TYPE);
810 :
811 136786 : case TRANS_PARALLEL_APPLY:
812 136786 : parallel_stream_nchanges += 1;
813 :
814 : /* Define a savepoint for a subxact if needed. */
815 136786 : pa_start_subtrans(current_xid, stream_xid);
816 136786 : return false;
817 :
818 0 : default:
819 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
820 : return false; /* silence compiler warning */
821 : }
822 : }
823 :
824 : /*
825 : * Executor state preparation for evaluation of constraint expressions,
826 : * indexes and triggers for the specified relation.
827 : *
828 : * Note that the caller must open and close any indexes to be updated.
829 : */
830 : static ApplyExecutionData *
831 296054 : create_edata_for_relation(LogicalRepRelMapEntry *rel)
832 : {
833 : ApplyExecutionData *edata;
834 : EState *estate;
835 : RangeTblEntry *rte;
836 296054 : List *perminfos = NIL;
837 : ResultRelInfo *resultRelInfo;
838 :
839 296054 : edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
840 296054 : edata->targetRel = rel;
841 :
842 296054 : edata->estate = estate = CreateExecutorState();
843 :
844 296054 : rte = makeNode(RangeTblEntry);
845 296054 : rte->rtekind = RTE_RELATION;
846 296054 : rte->relid = RelationGetRelid(rel->localrel);
847 296054 : rte->relkind = rel->localrel->rd_rel->relkind;
848 296054 : rte->rellockmode = AccessShareLock;
849 :
850 296054 : addRTEPermissionInfo(&perminfos, rte);
851 :
852 296054 : ExecInitRangeTable(estate, list_make1(rte), perminfos,
853 : bms_make_singleton(1));
854 :
855 296054 : edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
856 :
857 : /*
858 : * Use Relation opened by logicalrep_rel_open() instead of opening it
859 : * again.
860 : */
861 296054 : InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
862 :
863 : /*
864 : * We put the ResultRelInfo in the es_opened_result_relations list, even
865 : * though we don't populate the es_result_relations array. That's a bit
866 : * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
867 : *
868 : * ExecOpenIndices() is not called here either, each execution path doing
869 : * an apply operation being responsible for that.
870 : */
871 296054 : estate->es_opened_result_relations =
872 296054 : lappend(estate->es_opened_result_relations, resultRelInfo);
873 :
874 296054 : estate->es_output_cid = GetCurrentCommandId(true);
875 :
876 : /* Prepare to catch AFTER triggers. */
877 296054 : AfterTriggerBeginQuery();
878 :
879 : /* other fields of edata remain NULL for now */
880 :
881 296054 : return edata;
882 : }
883 :
884 : /*
885 : * Finish any operations related to the executor state created by
886 : * create_edata_for_relation().
887 : */
888 : static void
889 296000 : finish_edata(ApplyExecutionData *edata)
890 : {
891 296000 : EState *estate = edata->estate;
892 :
893 : /* Handle any queued AFTER triggers. */
894 296000 : AfterTriggerEndQuery(estate);
895 :
896 : /* Shut down tuple routing, if any was done. */
897 296000 : if (edata->proute)
898 148 : ExecCleanupTupleRouting(edata->mtstate, edata->proute);
899 :
900 : /*
901 : * Cleanup. It might seem that we should call ExecCloseResultRelations()
902 : * here, but we intentionally don't. It would close the rel we added to
903 : * es_opened_result_relations above, which is wrong because we took no
904 : * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
905 : * any other relations opened during execution.
906 : */
907 296000 : ExecResetTupleTable(estate->es_tupleTable, false);
908 296000 : FreeExecutorState(estate);
909 296000 : pfree(edata);
910 296000 : }
911 :
912 : /*
913 : * Executes default values for columns for which we can't map to remote
914 : * relation columns.
915 : *
916 : * This allows us to support tables which have more columns on the downstream
917 : * than on the upstream.
918 : */
919 : static void
920 151546 : slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
921 : TupleTableSlot *slot)
922 : {
923 151546 : TupleDesc desc = RelationGetDescr(rel->localrel);
924 151546 : int num_phys_attrs = desc->natts;
925 : int i;
926 : int attnum,
927 151546 : num_defaults = 0;
928 : int *defmap;
929 : ExprState **defexprs;
930 : ExprContext *econtext;
931 :
932 151546 : econtext = GetPerTupleExprContext(estate);
933 :
934 : /* We got all the data via replication, no need to evaluate anything. */
935 151546 : if (num_phys_attrs == rel->remoterel.natts)
936 71256 : return;
937 :
938 80290 : defmap = (int *) palloc(num_phys_attrs * sizeof(int));
939 80290 : defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
940 :
941 : Assert(rel->attrmap->maplen == num_phys_attrs);
942 421326 : for (attnum = 0; attnum < num_phys_attrs; attnum++)
943 : {
944 : Expr *defexpr;
945 :
946 341036 : if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
947 18 : continue;
948 :
949 341018 : if (rel->attrmap->attnums[attnum] >= 0)
950 184536 : continue;
951 :
952 156482 : defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
953 :
954 156482 : if (defexpr != NULL)
955 : {
956 : /* Run the expression through planner */
957 140262 : defexpr = expression_planner(defexpr);
958 :
959 : /* Initialize executable expression in copycontext */
960 140262 : defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
961 140262 : defmap[num_defaults] = attnum;
962 140262 : num_defaults++;
963 : }
964 : }
965 :
966 220552 : for (i = 0; i < num_defaults; i++)
967 140262 : slot->tts_values[defmap[i]] =
968 140262 : ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
969 : }
970 :
971 : /*
972 : * Store tuple data into slot.
973 : *
974 : * Incoming data can be either text or binary format.
975 : */
976 : static void
977 296074 : slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
978 : LogicalRepTupleData *tupleData)
979 : {
980 296074 : int natts = slot->tts_tupleDescriptor->natts;
981 : int i;
982 :
983 296074 : ExecClearTuple(slot);
984 :
985 : /* Call the "in" function for each non-dropped, non-null attribute */
986 : Assert(natts == rel->attrmap->maplen);
987 1315116 : for (i = 0; i < natts; i++)
988 : {
989 1019042 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
990 1019042 : int remoteattnum = rel->attrmap->attnums[i];
991 :
992 1019042 : if (!att->attisdropped && remoteattnum >= 0)
993 605208 : {
994 605208 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
995 :
996 : Assert(remoteattnum < tupleData->ncols);
997 :
998 : /* Set attnum for error callback */
999 605208 : apply_error_callback_arg.remote_attnum = remoteattnum;
1000 :
1001 605208 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
1002 : {
1003 : Oid typinput;
1004 : Oid typioparam;
1005 :
1006 284592 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1007 569184 : slot->tts_values[i] =
1008 284592 : OidInputFunctionCall(typinput, colvalue->data,
1009 : typioparam, att->atttypmod);
1010 284592 : slot->tts_isnull[i] = false;
1011 : }
1012 320616 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1013 : {
1014 : Oid typreceive;
1015 : Oid typioparam;
1016 :
1017 : /*
1018 : * In some code paths we may be asked to re-parse the same
1019 : * tuple data. Reset the StringInfo's cursor so that works.
1020 : */
1021 219960 : colvalue->cursor = 0;
1022 :
1023 219960 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1024 439920 : slot->tts_values[i] =
1025 219960 : OidReceiveFunctionCall(typreceive, colvalue,
1026 : typioparam, att->atttypmod);
1027 :
1028 : /* Trouble if it didn't eat the whole buffer */
1029 219960 : if (colvalue->cursor != colvalue->len)
1030 0 : ereport(ERROR,
1031 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
1032 : errmsg("incorrect binary data format in logical replication column %d",
1033 : remoteattnum + 1)));
1034 219960 : slot->tts_isnull[i] = false;
1035 : }
1036 : else
1037 : {
1038 : /*
1039 : * NULL value from remote. (We don't expect to see
1040 : * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
1041 : * NULL.)
1042 : */
1043 100656 : slot->tts_values[i] = (Datum) 0;
1044 100656 : slot->tts_isnull[i] = true;
1045 : }
1046 :
1047 : /* Reset attnum for error callback */
1048 605208 : apply_error_callback_arg.remote_attnum = -1;
1049 : }
1050 : else
1051 : {
1052 : /*
1053 : * We assign NULL to dropped attributes and missing values
1054 : * (missing values should be later filled using
1055 : * slot_fill_defaults).
1056 : */
1057 413834 : slot->tts_values[i] = (Datum) 0;
1058 413834 : slot->tts_isnull[i] = true;
1059 : }
1060 : }
1061 :
1062 296074 : ExecStoreVirtualTuple(slot);
1063 296074 : }
1064 :
1065 : /*
1066 : * Replace updated columns with data from the LogicalRepTupleData struct.
1067 : * This is somewhat similar to heap_modify_tuple but also calls the type
1068 : * input functions on the user data.
1069 : *
1070 : * "slot" is filled with a copy of the tuple in "srcslot", replacing
1071 : * columns provided in "tupleData" and leaving others as-is.
1072 : *
1073 : * Caution: unreplaced pass-by-ref columns in "slot" will point into the
1074 : * storage for "srcslot". This is OK for current usage, but someday we may
1075 : * need to materialize "slot" at the end to make it independent of "srcslot".
1076 : */
1077 : static void
1078 63848 : slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
1079 : LogicalRepRelMapEntry *rel,
1080 : LogicalRepTupleData *tupleData)
1081 : {
1082 63848 : int natts = slot->tts_tupleDescriptor->natts;
1083 : int i;
1084 :
1085 : /* We'll fill "slot" with a virtual tuple, so we must start with ... */
1086 63848 : ExecClearTuple(slot);
1087 :
1088 : /*
1089 : * Copy all the column data from srcslot, so that we'll have valid values
1090 : * for unreplaced columns.
1091 : */
1092 : Assert(natts == srcslot->tts_tupleDescriptor->natts);
1093 63848 : slot_getallattrs(srcslot);
1094 63848 : memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
1095 63848 : memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
1096 :
1097 : /* Call the "in" function for each replaced attribute */
1098 : Assert(natts == rel->attrmap->maplen);
1099 318560 : for (i = 0; i < natts; i++)
1100 : {
1101 254712 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
1102 254712 : int remoteattnum = rel->attrmap->attnums[i];
1103 :
1104 254712 : if (remoteattnum < 0)
1105 117038 : continue;
1106 :
1107 : Assert(remoteattnum < tupleData->ncols);
1108 :
1109 137674 : if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
1110 : {
1111 137668 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
1112 :
1113 : /* Set attnum for error callback */
1114 137668 : apply_error_callback_arg.remote_attnum = remoteattnum;
1115 :
1116 137668 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
1117 : {
1118 : Oid typinput;
1119 : Oid typioparam;
1120 :
1121 50860 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
1122 101720 : slot->tts_values[i] =
1123 50860 : OidInputFunctionCall(typinput, colvalue->data,
1124 : typioparam, att->atttypmod);
1125 50860 : slot->tts_isnull[i] = false;
1126 : }
1127 86808 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
1128 : {
1129 : Oid typreceive;
1130 : Oid typioparam;
1131 :
1132 : /*
1133 : * In some code paths we may be asked to re-parse the same
1134 : * tuple data. Reset the StringInfo's cursor so that works.
1135 : */
1136 86712 : colvalue->cursor = 0;
1137 :
1138 86712 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
1139 173424 : slot->tts_values[i] =
1140 86712 : OidReceiveFunctionCall(typreceive, colvalue,
1141 : typioparam, att->atttypmod);
1142 :
1143 : /* Trouble if it didn't eat the whole buffer */
1144 86712 : if (colvalue->cursor != colvalue->len)
1145 0 : ereport(ERROR,
1146 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
1147 : errmsg("incorrect binary data format in logical replication column %d",
1148 : remoteattnum + 1)));
1149 86712 : slot->tts_isnull[i] = false;
1150 : }
1151 : else
1152 : {
1153 : /* must be LOGICALREP_COLUMN_NULL */
1154 96 : slot->tts_values[i] = (Datum) 0;
1155 96 : slot->tts_isnull[i] = true;
1156 : }
1157 :
1158 : /* Reset attnum for error callback */
1159 137668 : apply_error_callback_arg.remote_attnum = -1;
1160 : }
1161 : }
1162 :
1163 : /* And finally, declare that "slot" contains a valid virtual tuple */
1164 63848 : ExecStoreVirtualTuple(slot);
1165 63848 : }
1166 :
1167 : /*
1168 : * Handle BEGIN message.
1169 : */
1170 : static void
1171 924 : apply_handle_begin(StringInfo s)
1172 : {
1173 : LogicalRepBeginData begin_data;
1174 :
1175 : /* There must not be an active streaming transaction. */
1176 : Assert(!TransactionIdIsValid(stream_xid));
1177 :
1178 924 : logicalrep_read_begin(s, &begin_data);
1179 924 : set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
1180 :
1181 924 : remote_final_lsn = begin_data.final_lsn;
1182 :
1183 924 : maybe_start_skipping_changes(begin_data.final_lsn);
1184 :
1185 924 : in_remote_transaction = true;
1186 :
1187 924 : pgstat_report_activity(STATE_RUNNING, NULL);
1188 924 : }
1189 :
1190 : /*
1191 : * Handle COMMIT message.
1192 : *
1193 : * TODO, support tracking of multiple origins
1194 : */
1195 : static void
1196 858 : apply_handle_commit(StringInfo s)
1197 : {
1198 : LogicalRepCommitData commit_data;
1199 :
1200 858 : logicalrep_read_commit(s, &commit_data);
1201 :
1202 858 : if (commit_data.commit_lsn != remote_final_lsn)
1203 0 : ereport(ERROR,
1204 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1205 : errmsg_internal("incorrect commit LSN %X/%08X in commit message (expected %X/%08X)",
1206 : LSN_FORMAT_ARGS(commit_data.commit_lsn),
1207 : LSN_FORMAT_ARGS(remote_final_lsn))));
1208 :
1209 858 : apply_handle_commit_internal(&commit_data);
1210 :
1211 : /* Process any tables that are being synchronized in parallel. */
1212 858 : process_syncing_tables(commit_data.end_lsn);
1213 :
1214 858 : pgstat_report_activity(STATE_IDLE, NULL);
1215 858 : reset_apply_error_context_info();
1216 858 : }
1217 :
1218 : /*
1219 : * Handle BEGIN PREPARE message.
1220 : */
1221 : static void
1222 32 : apply_handle_begin_prepare(StringInfo s)
1223 : {
1224 : LogicalRepPreparedTxnData begin_data;
1225 :
1226 : /* Tablesync should never receive prepare. */
1227 32 : if (am_tablesync_worker())
1228 0 : ereport(ERROR,
1229 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1230 : errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1231 :
1232 : /* There must not be an active streaming transaction. */
1233 : Assert(!TransactionIdIsValid(stream_xid));
1234 :
1235 32 : logicalrep_read_begin_prepare(s, &begin_data);
1236 32 : set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
1237 :
1238 32 : remote_final_lsn = begin_data.prepare_lsn;
1239 :
1240 32 : maybe_start_skipping_changes(begin_data.prepare_lsn);
1241 :
1242 32 : in_remote_transaction = true;
1243 :
1244 32 : pgstat_report_activity(STATE_RUNNING, NULL);
1245 32 : }
1246 :
1247 : /*
1248 : * Common function to prepare the GID.
1249 : */
1250 : static void
1251 46 : apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
1252 : {
1253 : char gid[GIDSIZE];
1254 :
1255 : /*
1256 : * Compute unique GID for two_phase transactions. We don't use GID of
1257 : * prepared transaction sent by server as that can lead to deadlock when
1258 : * we have multiple subscriptions from same node point to publications on
1259 : * the same node. See comments atop worker.c
1260 : */
1261 46 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
1262 : gid, sizeof(gid));
1263 :
1264 : /*
1265 : * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1266 : * called within the PrepareTransactionBlock below.
1267 : */
1268 46 : if (!IsTransactionBlock())
1269 : {
1270 46 : BeginTransactionBlock();
1271 46 : CommitTransactionCommand(); /* Completes the preceding Begin command. */
1272 : }
1273 :
1274 : /*
1275 : * Update origin state so we can restart streaming from correct position
1276 : * in case of crash.
1277 : */
1278 46 : replorigin_session_origin_lsn = prepare_data->end_lsn;
1279 46 : replorigin_session_origin_timestamp = prepare_data->prepare_time;
1280 :
1281 46 : PrepareTransactionBlock(gid);
1282 46 : }
1283 :
1284 : /*
1285 : * Handle PREPARE message.
1286 : */
1287 : static void
1288 30 : apply_handle_prepare(StringInfo s)
1289 : {
1290 : LogicalRepPreparedTxnData prepare_data;
1291 :
1292 30 : logicalrep_read_prepare(s, &prepare_data);
1293 :
1294 30 : if (prepare_data.prepare_lsn != remote_final_lsn)
1295 0 : ereport(ERROR,
1296 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1297 : errmsg_internal("incorrect prepare LSN %X/%08X in prepare message (expected %X/%08X)",
1298 : LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1299 : LSN_FORMAT_ARGS(remote_final_lsn))));
1300 :
1301 : /*
1302 : * Unlike commit, here, we always prepare the transaction even though no
1303 : * change has happened in this transaction or all changes are skipped. It
1304 : * is done this way because at commit prepared time, we won't know whether
1305 : * we have skipped preparing a transaction because of those reasons.
1306 : *
1307 : * XXX, We can optimize such that at commit prepared time, we first check
1308 : * whether we have prepared the transaction or not but that doesn't seem
1309 : * worthwhile because such cases shouldn't be common.
1310 : */
1311 30 : begin_replication_step();
1312 :
1313 30 : apply_handle_prepare_internal(&prepare_data);
1314 :
1315 30 : end_replication_step();
1316 30 : CommitTransactionCommand();
1317 28 : pgstat_report_stat(false);
1318 :
1319 : /*
1320 : * It is okay not to set the local_end LSN for the prepare because we
1321 : * always flush the prepare record. So, we can send the acknowledgment of
1322 : * the remote_end LSN as soon as prepare is finished.
1323 : *
1324 : * XXX For the sake of consistency with commit, we could have set it with
1325 : * the LSN of prepare but as of now we don't track that value similar to
1326 : * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
1327 : * it.
1328 : */
1329 28 : store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
1330 :
1331 28 : in_remote_transaction = false;
1332 :
1333 : /* Process any tables that are being synchronized in parallel. */
1334 28 : process_syncing_tables(prepare_data.end_lsn);
1335 :
1336 : /*
1337 : * Since we have already prepared the transaction, in a case where the
1338 : * server crashes before clearing the subskiplsn, it will be left but the
1339 : * transaction won't be resent. But that's okay because it's a rare case
1340 : * and the subskiplsn will be cleared when finishing the next transaction.
1341 : */
1342 28 : stop_skipping_changes();
1343 28 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
1344 :
1345 28 : pgstat_report_activity(STATE_IDLE, NULL);
1346 28 : reset_apply_error_context_info();
1347 28 : }
1348 :
1349 : /*
1350 : * Handle a COMMIT PREPARED of a previously PREPARED transaction.
1351 : *
1352 : * Note that we don't need to wait here if the transaction was prepared in a
1353 : * parallel apply worker. In that case, we have already waited for the prepare
1354 : * to finish in apply_handle_stream_prepare() which will ensure all the
1355 : * operations in that transaction have happened in the subscriber, so no
1356 : * concurrent transaction can cause deadlock or transaction dependency issues.
1357 : */
1358 : static void
1359 40 : apply_handle_commit_prepared(StringInfo s)
1360 : {
1361 : LogicalRepCommitPreparedTxnData prepare_data;
1362 : char gid[GIDSIZE];
1363 :
1364 40 : logicalrep_read_commit_prepared(s, &prepare_data);
1365 40 : set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
1366 :
1367 : /* Compute GID for two_phase transactions. */
1368 40 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
1369 : gid, sizeof(gid));
1370 :
1371 : /* There is no transaction when COMMIT PREPARED is called */
1372 40 : begin_replication_step();
1373 :
1374 : /*
1375 : * Update origin state so we can restart streaming from correct position
1376 : * in case of crash.
1377 : */
1378 40 : replorigin_session_origin_lsn = prepare_data.end_lsn;
1379 40 : replorigin_session_origin_timestamp = prepare_data.commit_time;
1380 :
1381 40 : FinishPreparedTransaction(gid, true);
1382 40 : end_replication_step();
1383 40 : CommitTransactionCommand();
1384 40 : pgstat_report_stat(false);
1385 :
1386 40 : store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
1387 40 : in_remote_transaction = false;
1388 :
1389 : /* Process any tables that are being synchronized in parallel. */
1390 40 : process_syncing_tables(prepare_data.end_lsn);
1391 :
1392 40 : clear_subscription_skip_lsn(prepare_data.end_lsn);
1393 :
1394 40 : pgstat_report_activity(STATE_IDLE, NULL);
1395 40 : reset_apply_error_context_info();
1396 40 : }
1397 :
1398 : /*
1399 : * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
1400 : *
1401 : * Note that we don't need to wait here if the transaction was prepared in a
1402 : * parallel apply worker. In that case, we have already waited for the prepare
1403 : * to finish in apply_handle_stream_prepare() which will ensure all the
1404 : * operations in that transaction have happened in the subscriber, so no
1405 : * concurrent transaction can cause deadlock or transaction dependency issues.
1406 : */
1407 : static void
1408 10 : apply_handle_rollback_prepared(StringInfo s)
1409 : {
1410 : LogicalRepRollbackPreparedTxnData rollback_data;
1411 : char gid[GIDSIZE];
1412 :
1413 10 : logicalrep_read_rollback_prepared(s, &rollback_data);
1414 10 : set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1415 :
1416 : /* Compute GID for two_phase transactions. */
1417 10 : TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
1418 : gid, sizeof(gid));
1419 :
1420 : /*
1421 : * It is possible that we haven't received prepare because it occurred
1422 : * before walsender reached a consistent point or the two_phase was still
1423 : * not enabled by that time, so in such cases, we need to skip rollback
1424 : * prepared.
1425 : */
1426 10 : if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1427 : rollback_data.prepare_time))
1428 : {
1429 : /*
1430 : * Update origin state so we can restart streaming from correct
1431 : * position in case of crash.
1432 : */
1433 10 : replorigin_session_origin_lsn = rollback_data.rollback_end_lsn;
1434 10 : replorigin_session_origin_timestamp = rollback_data.rollback_time;
1435 :
1436 : /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1437 10 : begin_replication_step();
1438 10 : FinishPreparedTransaction(gid, false);
1439 10 : end_replication_step();
1440 10 : CommitTransactionCommand();
1441 :
1442 10 : clear_subscription_skip_lsn(rollback_data.rollback_end_lsn);
1443 : }
1444 :
1445 10 : pgstat_report_stat(false);
1446 :
1447 : /*
1448 : * It is okay not to set the local_end LSN for the rollback of prepared
1449 : * transaction because we always flush the WAL record for it. See
1450 : * apply_handle_prepare.
1451 : */
1452 10 : store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr);
1453 10 : in_remote_transaction = false;
1454 :
1455 : /* Process any tables that are being synchronized in parallel. */
1456 10 : process_syncing_tables(rollback_data.rollback_end_lsn);
1457 :
1458 10 : pgstat_report_activity(STATE_IDLE, NULL);
1459 10 : reset_apply_error_context_info();
1460 10 : }
1461 :
1462 : /*
1463 : * Handle STREAM PREPARE.
1464 : */
1465 : static void
1466 22 : apply_handle_stream_prepare(StringInfo s)
1467 : {
1468 : LogicalRepPreparedTxnData prepare_data;
1469 : ParallelApplyWorkerInfo *winfo;
1470 : TransApplyAction apply_action;
1471 :
1472 : /* Save the message before it is consumed. */
1473 22 : StringInfoData original_msg = *s;
1474 :
1475 22 : if (in_streamed_transaction)
1476 0 : ereport(ERROR,
1477 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1478 : errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1479 :
1480 : /* Tablesync should never receive prepare. */
1481 22 : if (am_tablesync_worker())
1482 0 : ereport(ERROR,
1483 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1484 : errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1485 :
1486 22 : logicalrep_read_stream_prepare(s, &prepare_data);
1487 22 : set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1488 :
1489 22 : apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
1490 :
1491 22 : switch (apply_action)
1492 : {
1493 10 : case TRANS_LEADER_APPLY:
1494 :
1495 : /*
1496 : * The transaction has been serialized to file, so replay all the
1497 : * spooled operations.
1498 : */
1499 10 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset,
1500 : prepare_data.xid, prepare_data.prepare_lsn);
1501 :
1502 : /* Mark the transaction as prepared. */
1503 10 : apply_handle_prepare_internal(&prepare_data);
1504 :
1505 10 : CommitTransactionCommand();
1506 :
1507 : /*
1508 : * It is okay not to set the local_end LSN for the prepare because
1509 : * we always flush the prepare record. See apply_handle_prepare.
1510 : */
1511 10 : store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
1512 :
1513 10 : in_remote_transaction = false;
1514 :
1515 : /* Unlink the files with serialized changes and subxact info. */
1516 10 : stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
1517 :
1518 10 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1519 10 : break;
1520 :
1521 4 : case TRANS_LEADER_SEND_TO_PARALLEL:
1522 : Assert(winfo);
1523 :
1524 4 : if (pa_send_data(winfo, s->len, s->data))
1525 : {
1526 : /* Finish processing the streaming transaction. */
1527 4 : pa_xact_finish(winfo, prepare_data.end_lsn);
1528 4 : break;
1529 : }
1530 :
1531 : /*
1532 : * Switch to serialize mode when we are not able to send the
1533 : * change to parallel apply worker.
1534 : */
1535 0 : pa_switch_to_partial_serialize(winfo, true);
1536 :
1537 : /* fall through */
1538 2 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1539 : Assert(winfo);
1540 :
1541 2 : stream_open_and_write_change(prepare_data.xid,
1542 : LOGICAL_REP_MSG_STREAM_PREPARE,
1543 : &original_msg);
1544 :
1545 2 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
1546 :
1547 : /* Finish processing the streaming transaction. */
1548 2 : pa_xact_finish(winfo, prepare_data.end_lsn);
1549 2 : break;
1550 :
1551 6 : case TRANS_PARALLEL_APPLY:
1552 :
1553 : /*
1554 : * If the parallel apply worker is applying spooled messages then
1555 : * close the file before preparing.
1556 : */
1557 6 : if (stream_fd)
1558 2 : stream_close_file();
1559 :
1560 6 : begin_replication_step();
1561 :
1562 : /* Mark the transaction as prepared. */
1563 6 : apply_handle_prepare_internal(&prepare_data);
1564 :
1565 6 : end_replication_step();
1566 :
1567 6 : CommitTransactionCommand();
1568 :
1569 : /*
1570 : * It is okay not to set the local_end LSN for the prepare because
1571 : * we always flush the prepare record. See apply_handle_prepare.
1572 : */
1573 6 : MyParallelShared->last_commit_end = InvalidXLogRecPtr;
1574 :
1575 6 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
1576 6 : pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1577 :
1578 6 : pa_reset_subtrans();
1579 :
1580 6 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1581 6 : break;
1582 :
1583 0 : default:
1584 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1585 : break;
1586 : }
1587 :
1588 22 : pgstat_report_stat(false);
1589 :
1590 : /* Process any tables that are being synchronized in parallel. */
1591 22 : process_syncing_tables(prepare_data.end_lsn);
1592 :
1593 : /*
1594 : * Similar to prepare case, the subskiplsn could be left in a case of
1595 : * server crash but it's okay. See the comments in apply_handle_prepare().
1596 : */
1597 22 : stop_skipping_changes();
1598 22 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
1599 :
1600 22 : pgstat_report_activity(STATE_IDLE, NULL);
1601 :
1602 22 : reset_apply_error_context_info();
1603 22 : }
1604 :
1605 : /*
1606 : * Handle ORIGIN message.
1607 : *
1608 : * TODO, support tracking of multiple origins
1609 : */
1610 : static void
1611 18 : apply_handle_origin(StringInfo s)
1612 : {
1613 : /*
1614 : * ORIGIN message can only come inside streaming transaction or inside
1615 : * remote transaction and before any actual writes.
1616 : */
1617 18 : if (!in_streamed_transaction &&
1618 28 : (!in_remote_transaction ||
1619 14 : (IsTransactionState() && !am_tablesync_worker())))
1620 0 : ereport(ERROR,
1621 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1622 : errmsg_internal("ORIGIN message sent out of order")));
1623 18 : }
1624 :
1625 : /*
1626 : * Initialize fileset (if not already done).
1627 : *
1628 : * Create a new file when first_segment is true, otherwise open the existing
1629 : * file.
1630 : */
1631 : void
1632 724 : stream_start_internal(TransactionId xid, bool first_segment)
1633 : {
1634 724 : begin_replication_step();
1635 :
1636 : /*
1637 : * Initialize the worker's stream_fileset if we haven't yet. This will be
1638 : * used for the entire duration of the worker so create it in a permanent
1639 : * context. We create this on the very first streaming message from any
1640 : * transaction and then use it for this and other streaming transactions.
1641 : * Now, we could create a fileset at the start of the worker as well but
1642 : * then we won't be sure that it will ever be used.
1643 : */
1644 724 : if (!MyLogicalRepWorker->stream_fileset)
1645 : {
1646 : MemoryContext oldctx;
1647 :
1648 28 : oldctx = MemoryContextSwitchTo(ApplyContext);
1649 :
1650 28 : MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
1651 28 : FileSetInit(MyLogicalRepWorker->stream_fileset);
1652 :
1653 28 : MemoryContextSwitchTo(oldctx);
1654 : }
1655 :
1656 : /* Open the spool file for this transaction. */
1657 724 : stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
1658 :
1659 : /* If this is not the first segment, open existing subxact file. */
1660 724 : if (!first_segment)
1661 660 : subxact_info_read(MyLogicalRepWorker->subid, xid);
1662 :
1663 724 : end_replication_step();
1664 724 : }
1665 :
1666 : /*
1667 : * Handle STREAM START message.
1668 : */
1669 : static void
1670 1676 : apply_handle_stream_start(StringInfo s)
1671 : {
1672 : bool first_segment;
1673 : ParallelApplyWorkerInfo *winfo;
1674 : TransApplyAction apply_action;
1675 :
1676 : /* Save the message before it is consumed. */
1677 1676 : StringInfoData original_msg = *s;
1678 :
1679 1676 : if (in_streamed_transaction)
1680 0 : ereport(ERROR,
1681 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1682 : errmsg_internal("duplicate STREAM START message")));
1683 :
1684 : /* There must not be an active streaming transaction. */
1685 : Assert(!TransactionIdIsValid(stream_xid));
1686 :
1687 : /* notify handle methods we're processing a remote transaction */
1688 1676 : in_streamed_transaction = true;
1689 :
1690 : /* extract XID of the top-level transaction */
1691 1676 : stream_xid = logicalrep_read_stream_start(s, &first_segment);
1692 :
1693 1676 : if (!TransactionIdIsValid(stream_xid))
1694 0 : ereport(ERROR,
1695 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1696 : errmsg_internal("invalid transaction ID in streamed replication transaction")));
1697 :
1698 1676 : set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
1699 :
1700 : /* Try to allocate a worker for the streaming transaction. */
1701 1676 : if (first_segment)
1702 164 : pa_allocate_worker(stream_xid);
1703 :
1704 1676 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1705 :
1706 1676 : switch (apply_action)
1707 : {
1708 684 : case TRANS_LEADER_SERIALIZE:
1709 :
1710 : /*
1711 : * Function stream_start_internal starts a transaction. This
1712 : * transaction will be committed on the stream stop unless it is a
1713 : * tablesync worker in which case it will be committed after
1714 : * processing all the messages. We need this transaction for
1715 : * handling the BufFile, used for serializing the streaming data
1716 : * and subxact info.
1717 : */
1718 684 : stream_start_internal(stream_xid, first_segment);
1719 684 : break;
1720 :
1721 484 : case TRANS_LEADER_SEND_TO_PARALLEL:
1722 : Assert(winfo);
1723 :
1724 : /*
1725 : * Once we start serializing the changes, the parallel apply
1726 : * worker will wait for the leader to release the stream lock
1727 : * until the end of the transaction. So, we don't need to release
1728 : * the lock or increment the stream count in that case.
1729 : */
1730 484 : if (pa_send_data(winfo, s->len, s->data))
1731 : {
1732 : /*
1733 : * Unlock the shared object lock so that the parallel apply
1734 : * worker can continue to receive changes.
1735 : */
1736 476 : if (!first_segment)
1737 430 : pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
1738 :
1739 : /*
1740 : * Increment the number of streaming blocks waiting to be
1741 : * processed by parallel apply worker.
1742 : */
1743 476 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
1744 :
1745 : /* Cache the parallel apply worker for this transaction. */
1746 476 : pa_set_stream_apply_worker(winfo);
1747 476 : break;
1748 : }
1749 :
1750 : /*
1751 : * Switch to serialize mode when we are not able to send the
1752 : * change to parallel apply worker.
1753 : */
1754 8 : pa_switch_to_partial_serialize(winfo, !first_segment);
1755 :
1756 : /* fall through */
1757 30 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1758 : Assert(winfo);
1759 :
1760 : /*
1761 : * Open the spool file unless it was already opened when switching
1762 : * to serialize mode. The transaction started in
1763 : * stream_start_internal will be committed on the stream stop.
1764 : */
1765 30 : if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
1766 22 : stream_start_internal(stream_xid, first_segment);
1767 :
1768 30 : stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg);
1769 :
1770 : /* Cache the parallel apply worker for this transaction. */
1771 30 : pa_set_stream_apply_worker(winfo);
1772 30 : break;
1773 :
1774 486 : case TRANS_PARALLEL_APPLY:
1775 486 : if (first_segment)
1776 : {
1777 : /* Hold the lock until the end of the transaction. */
1778 54 : pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1779 54 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED);
1780 :
1781 : /*
1782 : * Signal the leader apply worker, as it may be waiting for
1783 : * us.
1784 : */
1785 54 : logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
1786 : }
1787 :
1788 486 : parallel_stream_nchanges = 0;
1789 486 : break;
1790 :
1791 0 : default:
1792 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1793 : break;
1794 : }
1795 :
1796 1676 : pgstat_report_activity(STATE_RUNNING, NULL);
1797 1676 : }
1798 :
1799 : /*
1800 : * Update the information about subxacts and close the file.
1801 : *
1802 : * This function should be called when the stream_start_internal function has
1803 : * been called.
1804 : */
1805 : void
1806 724 : stream_stop_internal(TransactionId xid)
1807 : {
1808 : /*
1809 : * Serialize information about subxacts for the toplevel transaction, then
1810 : * close the stream messages spool file.
1811 : */
1812 724 : subxact_info_write(MyLogicalRepWorker->subid, xid);
1813 724 : stream_close_file();
1814 :
1815 : /* We must be in a valid transaction state */
1816 : Assert(IsTransactionState());
1817 :
1818 : /* Commit the per-stream transaction */
1819 724 : CommitTransactionCommand();
1820 :
1821 : /* Reset per-stream context */
1822 724 : MemoryContextReset(LogicalStreamingContext);
1823 724 : }
1824 :
1825 : /*
1826 : * Handle STREAM STOP message.
1827 : */
1828 : static void
1829 1674 : apply_handle_stream_stop(StringInfo s)
1830 : {
1831 : ParallelApplyWorkerInfo *winfo;
1832 : TransApplyAction apply_action;
1833 :
1834 1674 : if (!in_streamed_transaction)
1835 0 : ereport(ERROR,
1836 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1837 : errmsg_internal("STREAM STOP message without STREAM START")));
1838 :
1839 1674 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1840 :
1841 1674 : switch (apply_action)
1842 : {
1843 684 : case TRANS_LEADER_SERIALIZE:
1844 684 : stream_stop_internal(stream_xid);
1845 684 : break;
1846 :
1847 476 : case TRANS_LEADER_SEND_TO_PARALLEL:
1848 : Assert(winfo);
1849 :
1850 : /*
1851 : * Lock before sending the STREAM_STOP message so that the leader
1852 : * can hold the lock first and the parallel apply worker will wait
1853 : * for leader to release the lock. See Locking Considerations atop
1854 : * applyparallelworker.c.
1855 : */
1856 476 : pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
1857 :
1858 476 : if (pa_send_data(winfo, s->len, s->data))
1859 : {
1860 476 : pa_set_stream_apply_worker(NULL);
1861 476 : break;
1862 : }
1863 :
1864 : /*
1865 : * Switch to serialize mode when we are not able to send the
1866 : * change to parallel apply worker.
1867 : */
1868 0 : pa_switch_to_partial_serialize(winfo, true);
1869 :
1870 : /* fall through */
1871 30 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1872 30 : stream_write_change(LOGICAL_REP_MSG_STREAM_STOP, s);
1873 30 : stream_stop_internal(stream_xid);
1874 30 : pa_set_stream_apply_worker(NULL);
1875 30 : break;
1876 :
1877 484 : case TRANS_PARALLEL_APPLY:
1878 484 : elog(DEBUG1, "applied %u changes in the streaming chunk",
1879 : parallel_stream_nchanges);
1880 :
1881 : /*
1882 : * By the time parallel apply worker is processing the changes in
1883 : * the current streaming block, the leader apply worker may have
1884 : * sent multiple streaming blocks. This can lead to parallel apply
1885 : * worker start waiting even when there are more chunk of streams
1886 : * in the queue. So, try to lock only if there is no message left
1887 : * in the queue. See Locking Considerations atop
1888 : * applyparallelworker.c.
1889 : *
1890 : * Note that here we have a race condition where we can start
1891 : * waiting even when there are pending streaming chunks. This can
1892 : * happen if the leader sends another streaming block and acquires
1893 : * the stream lock again after the parallel apply worker checks
1894 : * that there is no pending streaming block and before it actually
1895 : * starts waiting on a lock. We can handle this case by not
1896 : * allowing the leader to increment the stream block count during
1897 : * the time parallel apply worker acquires the lock but it is not
1898 : * clear whether that is worth the complexity.
1899 : *
1900 : * Now, if this missed chunk contains rollback to savepoint, then
1901 : * there is a risk of deadlock which probably shouldn't happen
1902 : * after restart.
1903 : */
1904 484 : pa_decr_and_wait_stream_block();
1905 480 : break;
1906 :
1907 0 : default:
1908 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1909 : break;
1910 : }
1911 :
1912 1670 : in_streamed_transaction = false;
1913 1670 : stream_xid = InvalidTransactionId;
1914 :
1915 : /*
1916 : * The parallel apply worker could be in a transaction in which case we
1917 : * need to report the state as STATE_IDLEINTRANSACTION.
1918 : */
1919 1670 : if (IsTransactionOrTransactionBlock())
1920 480 : pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL);
1921 : else
1922 1190 : pgstat_report_activity(STATE_IDLE, NULL);
1923 :
1924 1670 : reset_apply_error_context_info();
1925 1670 : }
1926 :
1927 : /*
1928 : * Helper function to handle STREAM ABORT message when the transaction was
1929 : * serialized to file.
1930 : */
1931 : static void
1932 28 : stream_abort_internal(TransactionId xid, TransactionId subxid)
1933 : {
1934 : /*
1935 : * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1936 : * just delete the files with serialized info.
1937 : */
1938 28 : if (xid == subxid)
1939 2 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
1940 : else
1941 : {
1942 : /*
1943 : * OK, so it's a subxact. We need to read the subxact file for the
1944 : * toplevel transaction, determine the offset tracked for the subxact,
1945 : * and truncate the file with changes. We also remove the subxacts
1946 : * with higher offsets (or rather higher XIDs).
1947 : *
1948 : * We intentionally scan the array from the tail, because we're likely
1949 : * aborting a change for the most recent subtransactions.
1950 : *
1951 : * We can't use the binary search here as subxact XIDs won't
1952 : * necessarily arrive in sorted order, consider the case where we have
1953 : * released the savepoint for multiple subtransactions and then
1954 : * performed rollback to savepoint for one of the earlier
1955 : * sub-transaction.
1956 : */
1957 : int64 i;
1958 : int64 subidx;
1959 : BufFile *fd;
1960 26 : bool found = false;
1961 : char path[MAXPGPATH];
1962 :
1963 26 : subidx = -1;
1964 26 : begin_replication_step();
1965 26 : subxact_info_read(MyLogicalRepWorker->subid, xid);
1966 :
1967 30 : for (i = subxact_data.nsubxacts; i > 0; i--)
1968 : {
1969 22 : if (subxact_data.subxacts[i - 1].xid == subxid)
1970 : {
1971 18 : subidx = (i - 1);
1972 18 : found = true;
1973 18 : break;
1974 : }
1975 : }
1976 :
1977 : /*
1978 : * If it's an empty sub-transaction then we will not find the subxid
1979 : * here so just cleanup the subxact info and return.
1980 : */
1981 26 : if (!found)
1982 : {
1983 : /* Cleanup the subxact info */
1984 8 : cleanup_subxact_info();
1985 8 : end_replication_step();
1986 8 : CommitTransactionCommand();
1987 8 : return;
1988 : }
1989 :
1990 : /* open the changes file */
1991 18 : changes_filename(path, MyLogicalRepWorker->subid, xid);
1992 18 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path,
1993 : O_RDWR, false);
1994 :
1995 : /* OK, truncate the file at the right offset */
1996 18 : BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
1997 18 : subxact_data.subxacts[subidx].offset);
1998 18 : BufFileClose(fd);
1999 :
2000 : /* discard the subxacts added later */
2001 18 : subxact_data.nsubxacts = subidx;
2002 :
2003 : /* write the updated subxact list */
2004 18 : subxact_info_write(MyLogicalRepWorker->subid, xid);
2005 :
2006 18 : end_replication_step();
2007 18 : CommitTransactionCommand();
2008 : }
2009 : }
2010 :
2011 : /*
2012 : * Handle STREAM ABORT message.
2013 : */
2014 : static void
2015 76 : apply_handle_stream_abort(StringInfo s)
2016 : {
2017 : TransactionId xid;
2018 : TransactionId subxid;
2019 : LogicalRepStreamAbortData abort_data;
2020 : ParallelApplyWorkerInfo *winfo;
2021 : TransApplyAction apply_action;
2022 :
2023 : /* Save the message before it is consumed. */
2024 76 : StringInfoData original_msg = *s;
2025 : bool toplevel_xact;
2026 :
2027 76 : if (in_streamed_transaction)
2028 0 : ereport(ERROR,
2029 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2030 : errmsg_internal("STREAM ABORT message without STREAM STOP")));
2031 :
2032 : /* We receive abort information only when we can apply in parallel. */
2033 76 : logicalrep_read_stream_abort(s, &abort_data,
2034 76 : MyLogicalRepWorker->parallel_apply);
2035 :
2036 76 : xid = abort_data.xid;
2037 76 : subxid = abort_data.subxid;
2038 76 : toplevel_xact = (xid == subxid);
2039 :
2040 76 : set_apply_error_context_xact(subxid, abort_data.abort_lsn);
2041 :
2042 76 : apply_action = get_transaction_apply_action(xid, &winfo);
2043 :
2044 76 : switch (apply_action)
2045 : {
2046 28 : case TRANS_LEADER_APPLY:
2047 :
2048 : /*
2049 : * We are in the leader apply worker and the transaction has been
2050 : * serialized to file.
2051 : */
2052 28 : stream_abort_internal(xid, subxid);
2053 :
2054 28 : elog(DEBUG1, "finished processing the STREAM ABORT command");
2055 28 : break;
2056 :
2057 20 : case TRANS_LEADER_SEND_TO_PARALLEL:
2058 : Assert(winfo);
2059 :
2060 : /*
2061 : * For the case of aborting the subtransaction, we increment the
2062 : * number of streaming blocks and take the lock again before
2063 : * sending the STREAM_ABORT to ensure that the parallel apply
2064 : * worker will wait on the lock for the next set of changes after
2065 : * processing the STREAM_ABORT message if it is not already
2066 : * waiting for STREAM_STOP message.
2067 : *
2068 : * It is important to perform this locking before sending the
2069 : * STREAM_ABORT message so that the leader can hold the lock first
2070 : * and the parallel apply worker will wait for the leader to
2071 : * release the lock. This is the same as what we do in
2072 : * apply_handle_stream_stop. See Locking Considerations atop
2073 : * applyparallelworker.c.
2074 : */
2075 20 : if (!toplevel_xact)
2076 : {
2077 18 : pa_unlock_stream(xid, AccessExclusiveLock);
2078 18 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
2079 18 : pa_lock_stream(xid, AccessExclusiveLock);
2080 : }
2081 :
2082 20 : if (pa_send_data(winfo, s->len, s->data))
2083 : {
2084 : /*
2085 : * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
2086 : * wait here for the parallel apply worker to finish as that
2087 : * is not required to maintain the commit order and won't have
2088 : * the risk of failures due to transaction dependencies and
2089 : * deadlocks. However, it is possible that before the parallel
2090 : * worker finishes and we clear the worker info, the xid
2091 : * wraparound happens on the upstream and a new transaction
2092 : * with the same xid can appear and that can lead to duplicate
2093 : * entries in ParallelApplyTxnHash. Yet another problem could
2094 : * be that we may have serialized the changes in partial
2095 : * serialize mode and the file containing xact changes may
2096 : * already exist, and after xid wraparound trying to create
2097 : * the file for the same xid can lead to an error. To avoid
2098 : * these problems, we decide to wait for the aborts to finish.
2099 : *
2100 : * Note, it is okay to not update the flush location position
2101 : * for aborts as in worst case that means such a transaction
2102 : * won't be sent again after restart.
2103 : */
2104 20 : if (toplevel_xact)
2105 2 : pa_xact_finish(winfo, InvalidXLogRecPtr);
2106 :
2107 20 : break;
2108 : }
2109 :
2110 : /*
2111 : * Switch to serialize mode when we are not able to send the
2112 : * change to parallel apply worker.
2113 : */
2114 0 : pa_switch_to_partial_serialize(winfo, true);
2115 :
2116 : /* fall through */
2117 4 : case TRANS_LEADER_PARTIAL_SERIALIZE:
2118 : Assert(winfo);
2119 :
2120 : /*
2121 : * Parallel apply worker might have applied some changes, so write
2122 : * the STREAM_ABORT message so that it can rollback the
2123 : * subtransaction if needed.
2124 : */
2125 4 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_ABORT,
2126 : &original_msg);
2127 :
2128 4 : if (toplevel_xact)
2129 : {
2130 2 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
2131 2 : pa_xact_finish(winfo, InvalidXLogRecPtr);
2132 : }
2133 4 : break;
2134 :
2135 24 : case TRANS_PARALLEL_APPLY:
2136 :
2137 : /*
2138 : * If the parallel apply worker is applying spooled messages then
2139 : * close the file before aborting.
2140 : */
2141 24 : if (toplevel_xact && stream_fd)
2142 2 : stream_close_file();
2143 :
2144 24 : pa_stream_abort(&abort_data);
2145 :
2146 : /*
2147 : * We need to wait after processing rollback to savepoint for the
2148 : * next set of changes.
2149 : *
2150 : * We have a race condition here due to which we can start waiting
2151 : * here when there are more chunk of streams in the queue. See
2152 : * apply_handle_stream_stop.
2153 : */
2154 24 : if (!toplevel_xact)
2155 20 : pa_decr_and_wait_stream_block();
2156 :
2157 24 : elog(DEBUG1, "finished processing the STREAM ABORT command");
2158 24 : break;
2159 :
2160 0 : default:
2161 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2162 : break;
2163 : }
2164 :
2165 76 : reset_apply_error_context_info();
2166 76 : }
2167 :
2168 : /*
2169 : * Ensure that the passed location is fileset's end.
2170 : */
2171 : static void
2172 8 : ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
2173 : off_t offset)
2174 : {
2175 : char path[MAXPGPATH];
2176 : BufFile *fd;
2177 : int last_fileno;
2178 : off_t last_offset;
2179 :
2180 : Assert(!IsTransactionState());
2181 :
2182 8 : begin_replication_step();
2183 :
2184 8 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2185 :
2186 8 : fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2187 :
2188 8 : BufFileSeek(fd, 0, 0, SEEK_END);
2189 8 : BufFileTell(fd, &last_fileno, &last_offset);
2190 :
2191 8 : BufFileClose(fd);
2192 :
2193 8 : end_replication_step();
2194 :
2195 8 : if (last_fileno != fileno || last_offset != offset)
2196 0 : elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2197 : path);
2198 8 : }
2199 :
2200 : /*
2201 : * Common spoolfile processing.
2202 : */
2203 : void
2204 62 : apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
2205 : XLogRecPtr lsn)
2206 : {
2207 : int nchanges;
2208 : char path[MAXPGPATH];
2209 62 : char *buffer = NULL;
2210 : MemoryContext oldcxt;
2211 : ResourceOwner oldowner;
2212 : int fileno;
2213 : off_t offset;
2214 :
2215 62 : if (!am_parallel_apply_worker())
2216 54 : maybe_start_skipping_changes(lsn);
2217 :
2218 : /* Make sure we have an open transaction */
2219 62 : begin_replication_step();
2220 :
2221 : /*
2222 : * Allocate file handle and memory required to process all the messages in
2223 : * TopTransactionContext to avoid them getting reset after each message is
2224 : * processed.
2225 : */
2226 62 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
2227 :
2228 : /* Open the spool file for the committed/prepared transaction */
2229 62 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2230 62 : elog(DEBUG1, "replaying changes from file \"%s\"", path);
2231 :
2232 : /*
2233 : * Make sure the file is owned by the toplevel transaction so that the
2234 : * file will not be accidentally closed when aborting a subtransaction.
2235 : */
2236 62 : oldowner = CurrentResourceOwner;
2237 62 : CurrentResourceOwner = TopTransactionResourceOwner;
2238 :
2239 62 : stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2240 :
2241 62 : CurrentResourceOwner = oldowner;
2242 :
2243 62 : buffer = palloc(BLCKSZ);
2244 :
2245 62 : MemoryContextSwitchTo(oldcxt);
2246 :
2247 62 : remote_final_lsn = lsn;
2248 :
2249 : /*
2250 : * Make sure the handle apply_dispatch methods are aware we're in a remote
2251 : * transaction.
2252 : */
2253 62 : in_remote_transaction = true;
2254 62 : pgstat_report_activity(STATE_RUNNING, NULL);
2255 :
2256 62 : end_replication_step();
2257 :
2258 : /*
2259 : * Read the entries one by one and pass them through the same logic as in
2260 : * apply_dispatch.
2261 : */
2262 62 : nchanges = 0;
2263 : while (true)
2264 176938 : {
2265 : StringInfoData s2;
2266 : size_t nbytes;
2267 : int len;
2268 :
2269 177000 : CHECK_FOR_INTERRUPTS();
2270 :
2271 : /* read length of the on-disk record */
2272 177000 : nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2273 :
2274 : /* have we reached end of the file? */
2275 177000 : if (nbytes == 0)
2276 52 : break;
2277 :
2278 : /* do we have a correct length? */
2279 176948 : if (len <= 0)
2280 0 : elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2281 : len, path);
2282 :
2283 : /* make sure we have sufficiently large buffer */
2284 176948 : buffer = repalloc(buffer, len);
2285 :
2286 : /* and finally read the data into the buffer */
2287 176948 : BufFileReadExact(stream_fd, buffer, len);
2288 :
2289 176948 : BufFileTell(stream_fd, &fileno, &offset);
2290 :
2291 : /* init a stringinfo using the buffer and call apply_dispatch */
2292 176948 : initReadOnlyStringInfo(&s2, buffer, len);
2293 :
2294 : /* Ensure we are reading the data into our memory context. */
2295 176948 : oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
2296 :
2297 176948 : apply_dispatch(&s2);
2298 :
2299 176946 : MemoryContextReset(ApplyMessageContext);
2300 :
2301 176946 : MemoryContextSwitchTo(oldcxt);
2302 :
2303 176946 : nchanges++;
2304 :
2305 : /*
2306 : * It is possible the file has been closed because we have processed
2307 : * the transaction end message like stream_commit in which case that
2308 : * must be the last message.
2309 : */
2310 176946 : if (!stream_fd)
2311 : {
2312 8 : ensure_last_message(stream_fileset, xid, fileno, offset);
2313 8 : break;
2314 : }
2315 :
2316 176938 : if (nchanges % 1000 == 0)
2317 166 : elog(DEBUG1, "replayed %d changes from file \"%s\"",
2318 : nchanges, path);
2319 : }
2320 :
2321 60 : if (stream_fd)
2322 52 : stream_close_file();
2323 :
2324 60 : elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2325 : nchanges, path);
2326 :
2327 60 : return;
2328 : }
2329 :
2330 : /*
2331 : * Handle STREAM COMMIT message.
2332 : */
2333 : static void
2334 122 : apply_handle_stream_commit(StringInfo s)
2335 : {
2336 : TransactionId xid;
2337 : LogicalRepCommitData commit_data;
2338 : ParallelApplyWorkerInfo *winfo;
2339 : TransApplyAction apply_action;
2340 :
2341 : /* Save the message before it is consumed. */
2342 122 : StringInfoData original_msg = *s;
2343 :
2344 122 : if (in_streamed_transaction)
2345 0 : ereport(ERROR,
2346 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2347 : errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2348 :
2349 122 : xid = logicalrep_read_stream_commit(s, &commit_data);
2350 122 : set_apply_error_context_xact(xid, commit_data.commit_lsn);
2351 :
2352 122 : apply_action = get_transaction_apply_action(xid, &winfo);
2353 :
2354 122 : switch (apply_action)
2355 : {
2356 44 : case TRANS_LEADER_APPLY:
2357 :
2358 : /*
2359 : * The transaction has been serialized to file, so replay all the
2360 : * spooled operations.
2361 : */
2362 44 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid,
2363 : commit_data.commit_lsn);
2364 :
2365 42 : apply_handle_commit_internal(&commit_data);
2366 :
2367 : /* Unlink the files with serialized changes and subxact info. */
2368 42 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
2369 :
2370 42 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2371 42 : break;
2372 :
2373 36 : case TRANS_LEADER_SEND_TO_PARALLEL:
2374 : Assert(winfo);
2375 :
2376 36 : if (pa_send_data(winfo, s->len, s->data))
2377 : {
2378 : /* Finish processing the streaming transaction. */
2379 36 : pa_xact_finish(winfo, commit_data.end_lsn);
2380 34 : break;
2381 : }
2382 :
2383 : /*
2384 : * Switch to serialize mode when we are not able to send the
2385 : * change to parallel apply worker.
2386 : */
2387 0 : pa_switch_to_partial_serialize(winfo, true);
2388 :
2389 : /* fall through */
2390 4 : case TRANS_LEADER_PARTIAL_SERIALIZE:
2391 : Assert(winfo);
2392 :
2393 4 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_COMMIT,
2394 : &original_msg);
2395 :
2396 4 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
2397 :
2398 : /* Finish processing the streaming transaction. */
2399 4 : pa_xact_finish(winfo, commit_data.end_lsn);
2400 4 : break;
2401 :
2402 38 : case TRANS_PARALLEL_APPLY:
2403 :
2404 : /*
2405 : * If the parallel apply worker is applying spooled messages then
2406 : * close the file before committing.
2407 : */
2408 38 : if (stream_fd)
2409 4 : stream_close_file();
2410 :
2411 38 : apply_handle_commit_internal(&commit_data);
2412 :
2413 38 : MyParallelShared->last_commit_end = XactLastCommitEnd;
2414 :
2415 : /*
2416 : * It is important to set the transaction state as finished before
2417 : * releasing the lock. See pa_wait_for_xact_finish.
2418 : */
2419 38 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
2420 38 : pa_unlock_transaction(xid, AccessExclusiveLock);
2421 :
2422 38 : pa_reset_subtrans();
2423 :
2424 38 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2425 38 : break;
2426 :
2427 0 : default:
2428 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2429 : break;
2430 : }
2431 :
2432 : /* Process any tables that are being synchronized in parallel. */
2433 118 : process_syncing_tables(commit_data.end_lsn);
2434 :
2435 118 : pgstat_report_activity(STATE_IDLE, NULL);
2436 :
2437 118 : reset_apply_error_context_info();
2438 118 : }
2439 :
2440 : /*
2441 : * Helper function for apply_handle_commit and apply_handle_stream_commit.
2442 : */
2443 : static void
2444 938 : apply_handle_commit_internal(LogicalRepCommitData *commit_data)
2445 : {
2446 938 : if (is_skipping_changes())
2447 : {
2448 4 : stop_skipping_changes();
2449 :
2450 : /*
2451 : * Start a new transaction to clear the subskiplsn, if not started
2452 : * yet.
2453 : */
2454 4 : if (!IsTransactionState())
2455 2 : StartTransactionCommand();
2456 : }
2457 :
2458 938 : if (IsTransactionState())
2459 : {
2460 : /*
2461 : * The transaction is either non-empty or skipped, so we clear the
2462 : * subskiplsn.
2463 : */
2464 938 : clear_subscription_skip_lsn(commit_data->commit_lsn);
2465 :
2466 : /*
2467 : * Update origin state so we can restart streaming from correct
2468 : * position in case of crash.
2469 : */
2470 938 : replorigin_session_origin_lsn = commit_data->end_lsn;
2471 938 : replorigin_session_origin_timestamp = commit_data->committime;
2472 :
2473 938 : CommitTransactionCommand();
2474 :
2475 938 : if (IsTransactionBlock())
2476 : {
2477 8 : EndTransactionBlock(false);
2478 8 : CommitTransactionCommand();
2479 : }
2480 :
2481 938 : pgstat_report_stat(false);
2482 :
2483 938 : store_flush_position(commit_data->end_lsn, XactLastCommitEnd);
2484 : }
2485 : else
2486 : {
2487 : /* Process any invalidation messages that might have accumulated. */
2488 0 : AcceptInvalidationMessages();
2489 0 : maybe_reread_subscription();
2490 : }
2491 :
2492 938 : in_remote_transaction = false;
2493 938 : }
2494 :
2495 : /*
2496 : * Handle RELATION message.
2497 : *
2498 : * Note we don't do validation against local schema here. The validation
2499 : * against local schema is postponed until first change for given relation
2500 : * comes as we only care about it when applying changes for it anyway and we
2501 : * do less locking this way.
2502 : */
2503 : static void
2504 882 : apply_handle_relation(StringInfo s)
2505 : {
2506 : LogicalRepRelation *rel;
2507 :
2508 882 : if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
2509 70 : return;
2510 :
2511 812 : rel = logicalrep_read_rel(s);
2512 812 : logicalrep_relmap_update(rel);
2513 :
2514 : /* Also reset all entries in the partition map that refer to remoterel. */
2515 812 : logicalrep_partmap_reset_relmap(rel);
2516 : }
2517 :
2518 : /*
2519 : * Handle TYPE message.
2520 : *
2521 : * This implementation pays no attention to TYPE messages; we expect the user
2522 : * to have set things up so that the incoming data is acceptable to the input
2523 : * functions for the locally subscribed tables. Hence, we just read and
2524 : * discard the message.
2525 : */
2526 : static void
2527 36 : apply_handle_type(StringInfo s)
2528 : {
2529 : LogicalRepTyp typ;
2530 :
2531 36 : if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s))
2532 0 : return;
2533 :
2534 36 : logicalrep_read_typ(s, &typ);
2535 : }
2536 :
2537 : /*
2538 : * Check that we (the subscription owner) have sufficient privileges on the
2539 : * target relation to perform the given operation.
2540 : */
2541 : static void
2542 440594 : TargetPrivilegesCheck(Relation rel, AclMode mode)
2543 : {
2544 : Oid relid;
2545 : AclResult aclresult;
2546 :
2547 440594 : relid = RelationGetRelid(rel);
2548 440594 : aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
2549 440594 : if (aclresult != ACLCHECK_OK)
2550 14 : aclcheck_error(aclresult,
2551 14 : get_relkind_objtype(rel->rd_rel->relkind),
2552 14 : get_rel_name(relid));
2553 :
2554 : /*
2555 : * We lack the infrastructure to honor RLS policies. It might be possible
2556 : * to add such infrastructure here, but tablesync workers lack it, too, so
2557 : * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2558 : * but it seems dangerous to replicate a TRUNCATE and then refuse to
2559 : * replicate subsequent INSERTs, so we forbid all commands the same.
2560 : */
2561 440580 : if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2562 6 : ereport(ERROR,
2563 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2564 : errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2565 : GetUserNameFromId(GetUserId(), true),
2566 : RelationGetRelationName(rel))));
2567 440574 : }
2568 :
2569 : /*
2570 : * Handle INSERT message.
2571 : */
2572 :
2573 : static void
2574 371696 : apply_handle_insert(StringInfo s)
2575 : {
2576 : LogicalRepRelMapEntry *rel;
2577 : LogicalRepTupleData newtup;
2578 : LogicalRepRelId relid;
2579 : UserContext ucxt;
2580 : ApplyExecutionData *edata;
2581 : EState *estate;
2582 : TupleTableSlot *remoteslot;
2583 : MemoryContext oldctx;
2584 : bool run_as_owner;
2585 :
2586 : /*
2587 : * Quick return if we are skipping data modification changes or handling
2588 : * streamed transactions.
2589 : */
2590 723390 : if (is_skipping_changes() ||
2591 351694 : handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
2592 220136 : return;
2593 :
2594 151682 : begin_replication_step();
2595 :
2596 151680 : relid = logicalrep_read_insert(s, &newtup);
2597 151680 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2598 151668 : if (!should_apply_changes_for_rel(rel))
2599 : {
2600 : /*
2601 : * The relation can't become interesting in the middle of the
2602 : * transaction so it's safe to unlock it.
2603 : */
2604 122 : logicalrep_rel_close(rel, RowExclusiveLock);
2605 122 : end_replication_step();
2606 122 : return;
2607 : }
2608 :
2609 : /*
2610 : * Make sure that any user-supplied code runs as the table owner, unless
2611 : * the user has opted out of that behavior.
2612 : */
2613 151546 : run_as_owner = MySubscription->runasowner;
2614 151546 : if (!run_as_owner)
2615 151530 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2616 :
2617 : /* Set relation for error callback */
2618 151546 : apply_error_callback_arg.rel = rel;
2619 :
2620 : /* Initialize the executor state. */
2621 151546 : edata = create_edata_for_relation(rel);
2622 151546 : estate = edata->estate;
2623 151546 : remoteslot = ExecInitExtraTupleSlot(estate,
2624 151546 : RelationGetDescr(rel->localrel),
2625 : &TTSOpsVirtual);
2626 :
2627 : /* Process and store remote tuple in the slot */
2628 151546 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2629 151546 : slot_store_data(remoteslot, rel, &newtup);
2630 151546 : slot_fill_defaults(rel, estate, remoteslot);
2631 151546 : MemoryContextSwitchTo(oldctx);
2632 :
2633 : /* For a partitioned table, insert the tuple into a partition. */
2634 151546 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2635 100 : apply_handle_tuple_routing(edata,
2636 : remoteslot, NULL, CMD_INSERT);
2637 : else
2638 : {
2639 151446 : ResultRelInfo *relinfo = edata->targetRelInfo;
2640 :
2641 151446 : ExecOpenIndices(relinfo, false);
2642 151446 : apply_handle_insert_internal(edata, relinfo, remoteslot);
2643 151416 : ExecCloseIndices(relinfo);
2644 : }
2645 :
2646 151504 : finish_edata(edata);
2647 :
2648 : /* Reset relation for error callback */
2649 151504 : apply_error_callback_arg.rel = NULL;
2650 :
2651 151504 : if (!run_as_owner)
2652 151494 : RestoreUserContext(&ucxt);
2653 :
2654 151504 : logicalrep_rel_close(rel, NoLock);
2655 :
2656 151504 : end_replication_step();
2657 : }
2658 :
2659 : /*
2660 : * Workhorse for apply_handle_insert()
2661 : * relinfo is for the relation we're actually inserting into
2662 : * (could be a child partition of edata->targetRelInfo)
2663 : */
2664 : static void
2665 151548 : apply_handle_insert_internal(ApplyExecutionData *edata,
2666 : ResultRelInfo *relinfo,
2667 : TupleTableSlot *remoteslot)
2668 : {
2669 151548 : EState *estate = edata->estate;
2670 :
2671 : /* Caller should have opened indexes already. */
2672 : Assert(relinfo->ri_IndexRelationDescs != NULL ||
2673 : !relinfo->ri_RelationDesc->rd_rel->relhasindex ||
2674 : RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
2675 :
2676 : /* Caller will not have done this bit. */
2677 : Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
2678 151548 : InitConflictIndexes(relinfo);
2679 :
2680 : /* Do the insert. */
2681 151548 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
2682 151536 : ExecSimpleRelationInsert(relinfo, estate, remoteslot);
2683 151506 : }
2684 :
2685 : /*
2686 : * Check if the logical replication relation is updatable and throw
2687 : * appropriate error if it isn't.
2688 : */
2689 : static void
2690 144570 : check_relation_updatable(LogicalRepRelMapEntry *rel)
2691 : {
2692 : /*
2693 : * For partitioned tables, we only need to care if the target partition is
2694 : * updatable (aka has PK or RI defined for it).
2695 : */
2696 144570 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2697 60 : return;
2698 :
2699 : /* Updatable, no error. */
2700 144510 : if (rel->updatable)
2701 144510 : return;
2702 :
2703 : /*
2704 : * We are in error mode so it's fine this is somewhat slow. It's better to
2705 : * give user correct error.
2706 : */
2707 0 : if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
2708 : {
2709 0 : ereport(ERROR,
2710 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2711 : errmsg("publisher did not send replica identity column "
2712 : "expected by the logical replication target relation \"%s.%s\"",
2713 : rel->remoterel.nspname, rel->remoterel.relname)));
2714 : }
2715 :
2716 0 : ereport(ERROR,
2717 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2718 : errmsg("logical replication target relation \"%s.%s\" has "
2719 : "neither REPLICA IDENTITY index nor PRIMARY "
2720 : "KEY and published relation does not have "
2721 : "REPLICA IDENTITY FULL",
2722 : rel->remoterel.nspname, rel->remoterel.relname)));
2723 : }
2724 :
2725 : /*
2726 : * Handle UPDATE message.
2727 : *
2728 : * TODO: FDW support
2729 : */
2730 : static void
2731 132320 : apply_handle_update(StringInfo s)
2732 : {
2733 : LogicalRepRelMapEntry *rel;
2734 : LogicalRepRelId relid;
2735 : UserContext ucxt;
2736 : ApplyExecutionData *edata;
2737 : EState *estate;
2738 : LogicalRepTupleData oldtup;
2739 : LogicalRepTupleData newtup;
2740 : bool has_oldtup;
2741 : TupleTableSlot *remoteslot;
2742 : RTEPermissionInfo *target_perminfo;
2743 : MemoryContext oldctx;
2744 : bool run_as_owner;
2745 :
2746 : /*
2747 : * Quick return if we are skipping data modification changes or handling
2748 : * streamed transactions.
2749 : */
2750 264634 : if (is_skipping_changes() ||
2751 132314 : handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
2752 68446 : return;
2753 :
2754 63874 : begin_replication_step();
2755 :
2756 63872 : relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
2757 : &newtup);
2758 63872 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2759 63872 : if (!should_apply_changes_for_rel(rel))
2760 : {
2761 : /*
2762 : * The relation can't become interesting in the middle of the
2763 : * transaction so it's safe to unlock it.
2764 : */
2765 0 : logicalrep_rel_close(rel, RowExclusiveLock);
2766 0 : end_replication_step();
2767 0 : return;
2768 : }
2769 :
2770 : /* Set relation for error callback */
2771 63872 : apply_error_callback_arg.rel = rel;
2772 :
2773 : /* Check if we can do the update. */
2774 63872 : check_relation_updatable(rel);
2775 :
2776 : /*
2777 : * Make sure that any user-supplied code runs as the table owner, unless
2778 : * the user has opted out of that behavior.
2779 : */
2780 63872 : run_as_owner = MySubscription->runasowner;
2781 63872 : if (!run_as_owner)
2782 63866 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2783 :
2784 : /* Initialize the executor state. */
2785 63870 : edata = create_edata_for_relation(rel);
2786 63870 : estate = edata->estate;
2787 63870 : remoteslot = ExecInitExtraTupleSlot(estate,
2788 63870 : RelationGetDescr(rel->localrel),
2789 : &TTSOpsVirtual);
2790 :
2791 : /*
2792 : * Populate updatedCols so that per-column triggers can fire, and so
2793 : * executor can correctly pass down indexUnchanged hint. This could
2794 : * include more columns than were actually changed on the publisher
2795 : * because the logical replication protocol doesn't contain that
2796 : * information. But it would for example exclude columns that only exist
2797 : * on the subscriber, since we are not touching those.
2798 : */
2799 63870 : target_perminfo = list_nth(estate->es_rteperminfos, 0);
2800 318614 : for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2801 : {
2802 254744 : Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i);
2803 254744 : int remoteattnum = rel->attrmap->attnums[i];
2804 :
2805 254744 : if (!att->attisdropped && remoteattnum >= 0)
2806 : {
2807 : Assert(remoteattnum < newtup.ncols);
2808 137710 : if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
2809 137704 : target_perminfo->updatedCols =
2810 137704 : bms_add_member(target_perminfo->updatedCols,
2811 : i + 1 - FirstLowInvalidHeapAttributeNumber);
2812 : }
2813 : }
2814 :
2815 : /* Build the search tuple. */
2816 63870 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2817 63870 : slot_store_data(remoteslot, rel,
2818 63870 : has_oldtup ? &oldtup : &newtup);
2819 63870 : MemoryContextSwitchTo(oldctx);
2820 :
2821 : /* For a partitioned table, apply update to correct partition. */
2822 63870 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2823 26 : apply_handle_tuple_routing(edata,
2824 : remoteslot, &newtup, CMD_UPDATE);
2825 : else
2826 63844 : apply_handle_update_internal(edata, edata->targetRelInfo,
2827 : remoteslot, &newtup, rel->localindexoid);
2828 :
2829 63858 : finish_edata(edata);
2830 :
2831 : /* Reset relation for error callback */
2832 63858 : apply_error_callback_arg.rel = NULL;
2833 :
2834 63858 : if (!run_as_owner)
2835 63854 : RestoreUserContext(&ucxt);
2836 :
2837 63858 : logicalrep_rel_close(rel, NoLock);
2838 :
2839 63858 : end_replication_step();
2840 : }
2841 :
2842 : /*
2843 : * Workhorse for apply_handle_update()
2844 : * relinfo is for the relation we're actually updating in
2845 : * (could be a child partition of edata->targetRelInfo)
2846 : */
2847 : static void
2848 63844 : apply_handle_update_internal(ApplyExecutionData *edata,
2849 : ResultRelInfo *relinfo,
2850 : TupleTableSlot *remoteslot,
2851 : LogicalRepTupleData *newtup,
2852 : Oid localindexoid)
2853 : {
2854 63844 : EState *estate = edata->estate;
2855 63844 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2856 63844 : Relation localrel = relinfo->ri_RelationDesc;
2857 : EPQState epqstate;
2858 63844 : TupleTableSlot *localslot = NULL;
2859 63844 : ConflictTupleInfo conflicttuple = {0};
2860 : bool found;
2861 : MemoryContext oldctx;
2862 :
2863 63844 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2864 63844 : ExecOpenIndices(relinfo, false);
2865 :
2866 63844 : found = FindReplTupleInLocalRel(edata, localrel,
2867 : &relmapentry->remoterel,
2868 : localindexoid,
2869 : remoteslot, &localslot);
2870 :
2871 : /*
2872 : * Tuple found.
2873 : *
2874 : * Note this will fail if there are other conflicting unique indexes.
2875 : */
2876 63836 : if (found)
2877 : {
2878 : /*
2879 : * Report the conflict if the tuple was modified by a different
2880 : * origin.
2881 : */
2882 63826 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
2883 4 : &conflicttuple.origin, &conflicttuple.ts) &&
2884 4 : conflicttuple.origin != replorigin_session_origin)
2885 : {
2886 : TupleTableSlot *newslot;
2887 :
2888 : /* Store the new tuple for conflict reporting */
2889 4 : newslot = table_slot_create(localrel, &estate->es_tupleTable);
2890 4 : slot_store_data(newslot, relmapentry, newtup);
2891 :
2892 4 : conflicttuple.slot = localslot;
2893 :
2894 4 : ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
2895 : remoteslot, newslot,
2896 4 : list_make1(&conflicttuple));
2897 : }
2898 :
2899 : /* Process and store remote tuple in the slot */
2900 63826 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2901 63826 : slot_modify_data(remoteslot, localslot, relmapentry, newtup);
2902 63826 : MemoryContextSwitchTo(oldctx);
2903 :
2904 63826 : EvalPlanQualSetSlot(&epqstate, remoteslot);
2905 :
2906 63826 : InitConflictIndexes(relinfo);
2907 :
2908 : /* Do the actual update. */
2909 63826 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
2910 63826 : ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2911 : remoteslot);
2912 : }
2913 : else
2914 : {
2915 10 : TupleTableSlot *newslot = localslot;
2916 :
2917 : /* Store the new tuple for conflict reporting */
2918 10 : slot_store_data(newslot, relmapentry, newtup);
2919 :
2920 : /*
2921 : * The tuple to be updated could not be found. Do nothing except for
2922 : * emitting a log message.
2923 : */
2924 10 : ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
2925 10 : remoteslot, newslot, list_make1(&conflicttuple));
2926 : }
2927 :
2928 : /* Cleanup. */
2929 63832 : ExecCloseIndices(relinfo);
2930 63832 : EvalPlanQualEnd(&epqstate);
2931 63832 : }
2932 :
2933 : /*
2934 : * Handle DELETE message.
2935 : *
2936 : * TODO: FDW support
2937 : */
2938 : static void
2939 163868 : apply_handle_delete(StringInfo s)
2940 : {
2941 : LogicalRepRelMapEntry *rel;
2942 : LogicalRepTupleData oldtup;
2943 : LogicalRepRelId relid;
2944 : UserContext ucxt;
2945 : ApplyExecutionData *edata;
2946 : EState *estate;
2947 : TupleTableSlot *remoteslot;
2948 : MemoryContext oldctx;
2949 : bool run_as_owner;
2950 :
2951 : /*
2952 : * Quick return if we are skipping data modification changes or handling
2953 : * streamed transactions.
2954 : */
2955 327736 : if (is_skipping_changes() ||
2956 163868 : handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
2957 83230 : return;
2958 :
2959 80638 : begin_replication_step();
2960 :
2961 80638 : relid = logicalrep_read_delete(s, &oldtup);
2962 80638 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2963 80638 : if (!should_apply_changes_for_rel(rel))
2964 : {
2965 : /*
2966 : * The relation can't become interesting in the middle of the
2967 : * transaction so it's safe to unlock it.
2968 : */
2969 0 : logicalrep_rel_close(rel, RowExclusiveLock);
2970 0 : end_replication_step();
2971 0 : return;
2972 : }
2973 :
2974 : /* Set relation for error callback */
2975 80638 : apply_error_callback_arg.rel = rel;
2976 :
2977 : /* Check if we can do the delete. */
2978 80638 : check_relation_updatable(rel);
2979 :
2980 : /*
2981 : * Make sure that any user-supplied code runs as the table owner, unless
2982 : * the user has opted out of that behavior.
2983 : */
2984 80638 : run_as_owner = MySubscription->runasowner;
2985 80638 : if (!run_as_owner)
2986 80634 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2987 :
2988 : /* Initialize the executor state. */
2989 80638 : edata = create_edata_for_relation(rel);
2990 80638 : estate = edata->estate;
2991 80638 : remoteslot = ExecInitExtraTupleSlot(estate,
2992 80638 : RelationGetDescr(rel->localrel),
2993 : &TTSOpsVirtual);
2994 :
2995 : /* Build the search tuple. */
2996 80638 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2997 80638 : slot_store_data(remoteslot, rel, &oldtup);
2998 80638 : MemoryContextSwitchTo(oldctx);
2999 :
3000 : /* For a partitioned table, apply delete to correct partition. */
3001 80638 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3002 34 : apply_handle_tuple_routing(edata,
3003 : remoteslot, NULL, CMD_DELETE);
3004 : else
3005 : {
3006 80604 : ResultRelInfo *relinfo = edata->targetRelInfo;
3007 :
3008 80604 : ExecOpenIndices(relinfo, false);
3009 80604 : apply_handle_delete_internal(edata, relinfo,
3010 : remoteslot, rel->localindexoid);
3011 80604 : ExecCloseIndices(relinfo);
3012 : }
3013 :
3014 80638 : finish_edata(edata);
3015 :
3016 : /* Reset relation for error callback */
3017 80638 : apply_error_callback_arg.rel = NULL;
3018 :
3019 80638 : if (!run_as_owner)
3020 80634 : RestoreUserContext(&ucxt);
3021 :
3022 80638 : logicalrep_rel_close(rel, NoLock);
3023 :
3024 80638 : end_replication_step();
3025 : }
3026 :
3027 : /*
3028 : * Workhorse for apply_handle_delete()
3029 : * relinfo is for the relation we're actually deleting from
3030 : * (could be a child partition of edata->targetRelInfo)
3031 : */
3032 : static void
3033 80638 : apply_handle_delete_internal(ApplyExecutionData *edata,
3034 : ResultRelInfo *relinfo,
3035 : TupleTableSlot *remoteslot,
3036 : Oid localindexoid)
3037 : {
3038 80638 : EState *estate = edata->estate;
3039 80638 : Relation localrel = relinfo->ri_RelationDesc;
3040 80638 : LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
3041 : EPQState epqstate;
3042 : TupleTableSlot *localslot;
3043 80638 : ConflictTupleInfo conflicttuple = {0};
3044 : bool found;
3045 :
3046 80638 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3047 :
3048 : /* Caller should have opened indexes already. */
3049 : Assert(relinfo->ri_IndexRelationDescs != NULL ||
3050 : !localrel->rd_rel->relhasindex ||
3051 : RelationGetIndexList(localrel) == NIL);
3052 :
3053 80638 : found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
3054 : remoteslot, &localslot);
3055 :
3056 : /* If found delete it. */
3057 80638 : if (found)
3058 : {
3059 : /*
3060 : * Report the conflict if the tuple was modified by a different
3061 : * origin.
3062 : */
3063 80620 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
3064 8 : &conflicttuple.origin, &conflicttuple.ts) &&
3065 8 : conflicttuple.origin != replorigin_session_origin)
3066 : {
3067 6 : conflicttuple.slot = localslot;
3068 6 : ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
3069 : remoteslot, NULL,
3070 6 : list_make1(&conflicttuple));
3071 : }
3072 :
3073 80620 : EvalPlanQualSetSlot(&epqstate, localslot);
3074 :
3075 : /* Do the actual delete. */
3076 80620 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_DELETE);
3077 80620 : ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
3078 : }
3079 : else
3080 : {
3081 : /*
3082 : * The tuple to be deleted could not be found. Do nothing except for
3083 : * emitting a log message.
3084 : */
3085 18 : ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
3086 18 : remoteslot, NULL, list_make1(&conflicttuple));
3087 : }
3088 :
3089 : /* Cleanup. */
3090 80638 : EvalPlanQualEnd(&epqstate);
3091 80638 : }
3092 :
3093 : /*
3094 : * Try to find a tuple received from the publication side (in 'remoteslot') in
3095 : * the corresponding local relation using either replica identity index,
3096 : * primary key, index or if needed, sequential scan.
3097 : *
3098 : * Local tuple, if found, is returned in '*localslot'.
3099 : */
3100 : static bool
3101 144508 : FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
3102 : LogicalRepRelation *remoterel,
3103 : Oid localidxoid,
3104 : TupleTableSlot *remoteslot,
3105 : TupleTableSlot **localslot)
3106 : {
3107 144508 : EState *estate = edata->estate;
3108 : bool found;
3109 :
3110 : /*
3111 : * Regardless of the top-level operation, we're performing a read here, so
3112 : * check for SELECT privileges.
3113 : */
3114 144508 : TargetPrivilegesCheck(localrel, ACL_SELECT);
3115 :
3116 144500 : *localslot = table_slot_create(localrel, &estate->es_tupleTable);
3117 :
3118 : Assert(OidIsValid(localidxoid) ||
3119 : (remoterel->replident == REPLICA_IDENTITY_FULL));
3120 :
3121 144500 : if (OidIsValid(localidxoid))
3122 : {
3123 : #ifdef USE_ASSERT_CHECKING
3124 : Relation idxrel = index_open(localidxoid, AccessShareLock);
3125 :
3126 : /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
3127 : Assert(GetRelationIdentityOrPK(localrel) == localidxoid ||
3128 : (remoterel->replident == REPLICA_IDENTITY_FULL &&
3129 : IsIndexUsableForReplicaIdentityFull(idxrel,
3130 : edata->targetRel->attrmap)));
3131 : index_close(idxrel, AccessShareLock);
3132 : #endif
3133 :
3134 144202 : found = RelationFindReplTupleByIndex(localrel, localidxoid,
3135 : LockTupleExclusive,
3136 : remoteslot, *localslot);
3137 : }
3138 : else
3139 298 : found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
3140 : remoteslot, *localslot);
3141 :
3142 144500 : return found;
3143 : }
3144 :
3145 : /*
3146 : * This handles insert, update, delete on a partitioned table.
3147 : */
3148 : static void
3149 160 : apply_handle_tuple_routing(ApplyExecutionData *edata,
3150 : TupleTableSlot *remoteslot,
3151 : LogicalRepTupleData *newtup,
3152 : CmdType operation)
3153 : {
3154 160 : EState *estate = edata->estate;
3155 160 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
3156 160 : ResultRelInfo *relinfo = edata->targetRelInfo;
3157 160 : Relation parentrel = relinfo->ri_RelationDesc;
3158 : ModifyTableState *mtstate;
3159 : PartitionTupleRouting *proute;
3160 : ResultRelInfo *partrelinfo;
3161 : Relation partrel;
3162 : TupleTableSlot *remoteslot_part;
3163 : TupleConversionMap *map;
3164 : MemoryContext oldctx;
3165 160 : LogicalRepRelMapEntry *part_entry = NULL;
3166 160 : AttrMap *attrmap = NULL;
3167 :
3168 : /* ModifyTableState is needed for ExecFindPartition(). */
3169 160 : edata->mtstate = mtstate = makeNode(ModifyTableState);
3170 160 : mtstate->ps.plan = NULL;
3171 160 : mtstate->ps.state = estate;
3172 160 : mtstate->operation = operation;
3173 160 : mtstate->resultRelInfo = relinfo;
3174 :
3175 : /* ... as is PartitionTupleRouting. */
3176 160 : edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
3177 :
3178 : /*
3179 : * Find the partition to which the "search tuple" belongs.
3180 : */
3181 : Assert(remoteslot != NULL);
3182 160 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3183 160 : partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
3184 : remoteslot, estate);
3185 : Assert(partrelinfo != NULL);
3186 160 : partrel = partrelinfo->ri_RelationDesc;
3187 :
3188 : /*
3189 : * Check for supported relkind. We need this since partitions might be of
3190 : * unsupported relkinds; and the set of partitions can change, so checking
3191 : * at CREATE/ALTER SUBSCRIPTION would be insufficient.
3192 : */
3193 160 : CheckSubscriptionRelkind(partrel->rd_rel->relkind,
3194 160 : get_namespace_name(RelationGetNamespace(partrel)),
3195 160 : RelationGetRelationName(partrel));
3196 :
3197 : /*
3198 : * To perform any of the operations below, the tuple must match the
3199 : * partition's rowtype. Convert if needed or just copy, using a dedicated
3200 : * slot to store the tuple in any case.
3201 : */
3202 160 : remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
3203 160 : if (remoteslot_part == NULL)
3204 94 : remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
3205 160 : map = ExecGetRootToChildMap(partrelinfo, estate);
3206 160 : if (map != NULL)
3207 : {
3208 66 : attrmap = map->attrMap;
3209 66 : remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
3210 : remoteslot_part);
3211 : }
3212 : else
3213 : {
3214 94 : remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
3215 94 : slot_getallattrs(remoteslot_part);
3216 : }
3217 160 : MemoryContextSwitchTo(oldctx);
3218 :
3219 : /* Check if we can do the update or delete on the leaf partition. */
3220 160 : if (operation == CMD_UPDATE || operation == CMD_DELETE)
3221 : {
3222 60 : part_entry = logicalrep_partition_open(relmapentry, partrel,
3223 : attrmap);
3224 60 : check_relation_updatable(part_entry);
3225 : }
3226 :
3227 160 : switch (operation)
3228 : {
3229 100 : case CMD_INSERT:
3230 100 : apply_handle_insert_internal(edata, partrelinfo,
3231 : remoteslot_part);
3232 88 : break;
3233 :
3234 34 : case CMD_DELETE:
3235 34 : apply_handle_delete_internal(edata, partrelinfo,
3236 : remoteslot_part,
3237 : part_entry->localindexoid);
3238 34 : break;
3239 :
3240 26 : case CMD_UPDATE:
3241 :
3242 : /*
3243 : * For UPDATE, depending on whether or not the updated tuple
3244 : * satisfies the partition's constraint, perform a simple UPDATE
3245 : * of the partition or move the updated tuple into a different
3246 : * suitable partition.
3247 : */
3248 : {
3249 : TupleTableSlot *localslot;
3250 : ResultRelInfo *partrelinfo_new;
3251 : Relation partrel_new;
3252 : bool found;
3253 : EPQState epqstate;
3254 26 : ConflictTupleInfo conflicttuple = {0};
3255 :
3256 : /* Get the matching local tuple from the partition. */
3257 26 : found = FindReplTupleInLocalRel(edata, partrel,
3258 : &part_entry->remoterel,
3259 : part_entry->localindexoid,
3260 : remoteslot_part, &localslot);
3261 26 : if (!found)
3262 : {
3263 4 : TupleTableSlot *newslot = localslot;
3264 :
3265 : /* Store the new tuple for conflict reporting */
3266 4 : slot_store_data(newslot, part_entry, newtup);
3267 :
3268 : /*
3269 : * The tuple to be updated could not be found. Do nothing
3270 : * except for emitting a log message.
3271 : */
3272 4 : ReportApplyConflict(estate, partrelinfo, LOG,
3273 : CT_UPDATE_MISSING, remoteslot_part,
3274 4 : newslot, list_make1(&conflicttuple));
3275 :
3276 4 : return;
3277 : }
3278 :
3279 : /*
3280 : * Report the conflict if the tuple was modified by a
3281 : * different origin.
3282 : */
3283 22 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
3284 : &conflicttuple.origin,
3285 2 : &conflicttuple.ts) &&
3286 2 : conflicttuple.origin != replorigin_session_origin)
3287 : {
3288 : TupleTableSlot *newslot;
3289 :
3290 : /* Store the new tuple for conflict reporting */
3291 2 : newslot = table_slot_create(partrel, &estate->es_tupleTable);
3292 2 : slot_store_data(newslot, part_entry, newtup);
3293 :
3294 2 : conflicttuple.slot = localslot;
3295 :
3296 2 : ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
3297 : remoteslot_part, newslot,
3298 2 : list_make1(&conflicttuple));
3299 : }
3300 :
3301 : /*
3302 : * Apply the update to the local tuple, putting the result in
3303 : * remoteslot_part.
3304 : */
3305 22 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3306 22 : slot_modify_data(remoteslot_part, localslot, part_entry,
3307 : newtup);
3308 22 : MemoryContextSwitchTo(oldctx);
3309 :
3310 22 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3311 :
3312 : /*
3313 : * Does the updated tuple still satisfy the current
3314 : * partition's constraint?
3315 : */
3316 44 : if (!partrel->rd_rel->relispartition ||
3317 22 : ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
3318 : false))
3319 : {
3320 : /*
3321 : * Yes, so simply UPDATE the partition. We don't call
3322 : * apply_handle_update_internal() here, which would
3323 : * normally do the following work, to avoid repeating some
3324 : * work already done above to find the local tuple in the
3325 : * partition.
3326 : */
3327 20 : InitConflictIndexes(partrelinfo);
3328 :
3329 20 : EvalPlanQualSetSlot(&epqstate, remoteslot_part);
3330 20 : TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
3331 : ACL_UPDATE);
3332 20 : ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3333 : localslot, remoteslot_part);
3334 : }
3335 : else
3336 : {
3337 : /* Move the tuple into the new partition. */
3338 :
3339 : /*
3340 : * New partition will be found using tuple routing, which
3341 : * can only occur via the parent table. We might need to
3342 : * convert the tuple to the parent's rowtype. Note that
3343 : * this is the tuple found in the partition, not the
3344 : * original search tuple received by this function.
3345 : */
3346 2 : if (map)
3347 : {
3348 : TupleConversionMap *PartitionToRootMap =
3349 2 : convert_tuples_by_name(RelationGetDescr(partrel),
3350 : RelationGetDescr(parentrel));
3351 :
3352 : remoteslot =
3353 2 : execute_attr_map_slot(PartitionToRootMap->attrMap,
3354 : remoteslot_part, remoteslot);
3355 : }
3356 : else
3357 : {
3358 0 : remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
3359 0 : slot_getallattrs(remoteslot);
3360 : }
3361 :
3362 : /* Find the new partition. */
3363 2 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3364 2 : partrelinfo_new = ExecFindPartition(mtstate, relinfo,
3365 : proute, remoteslot,
3366 : estate);
3367 2 : MemoryContextSwitchTo(oldctx);
3368 : Assert(partrelinfo_new != partrelinfo);
3369 2 : partrel_new = partrelinfo_new->ri_RelationDesc;
3370 :
3371 : /* Check that new partition also has supported relkind. */
3372 2 : CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3373 2 : get_namespace_name(RelationGetNamespace(partrel_new)),
3374 2 : RelationGetRelationName(partrel_new));
3375 :
3376 : /* DELETE old tuple found in the old partition. */
3377 2 : EvalPlanQualSetSlot(&epqstate, localslot);
3378 2 : TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, ACL_DELETE);
3379 2 : ExecSimpleRelationDelete(partrelinfo, estate, &epqstate, localslot);
3380 :
3381 : /* INSERT new tuple into the new partition. */
3382 :
3383 : /*
3384 : * Convert the replacement tuple to match the destination
3385 : * partition rowtype.
3386 : */
3387 2 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3388 2 : remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3389 2 : if (remoteslot_part == NULL)
3390 2 : remoteslot_part = table_slot_create(partrel_new,
3391 : &estate->es_tupleTable);
3392 2 : map = ExecGetRootToChildMap(partrelinfo_new, estate);
3393 2 : if (map != NULL)
3394 : {
3395 0 : remoteslot_part = execute_attr_map_slot(map->attrMap,
3396 : remoteslot,
3397 : remoteslot_part);
3398 : }
3399 : else
3400 : {
3401 2 : remoteslot_part = ExecCopySlot(remoteslot_part,
3402 : remoteslot);
3403 2 : slot_getallattrs(remoteslot);
3404 : }
3405 2 : MemoryContextSwitchTo(oldctx);
3406 2 : apply_handle_insert_internal(edata, partrelinfo_new,
3407 : remoteslot_part);
3408 : }
3409 :
3410 22 : EvalPlanQualEnd(&epqstate);
3411 : }
3412 22 : break;
3413 :
3414 0 : default:
3415 0 : elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3416 : break;
3417 : }
3418 : }
3419 :
3420 : /*
3421 : * Handle TRUNCATE message.
3422 : *
3423 : * TODO: FDW support
3424 : */
3425 : static void
3426 38 : apply_handle_truncate(StringInfo s)
3427 : {
3428 38 : bool cascade = false;
3429 38 : bool restart_seqs = false;
3430 38 : List *remote_relids = NIL;
3431 38 : List *remote_rels = NIL;
3432 38 : List *rels = NIL;
3433 38 : List *part_rels = NIL;
3434 38 : List *relids = NIL;
3435 38 : List *relids_logged = NIL;
3436 : ListCell *lc;
3437 38 : LOCKMODE lockmode = AccessExclusiveLock;
3438 :
3439 : /*
3440 : * Quick return if we are skipping data modification changes or handling
3441 : * streamed transactions.
3442 : */
3443 76 : if (is_skipping_changes() ||
3444 38 : handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
3445 0 : return;
3446 :
3447 38 : begin_replication_step();
3448 :
3449 38 : remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3450 :
3451 94 : foreach(lc, remote_relids)
3452 : {
3453 56 : LogicalRepRelId relid = lfirst_oid(lc);
3454 : LogicalRepRelMapEntry *rel;
3455 :
3456 56 : rel = logicalrep_rel_open(relid, lockmode);
3457 56 : if (!should_apply_changes_for_rel(rel))
3458 : {
3459 : /*
3460 : * The relation can't become interesting in the middle of the
3461 : * transaction so it's safe to unlock it.
3462 : */
3463 0 : logicalrep_rel_close(rel, lockmode);
3464 0 : continue;
3465 : }
3466 :
3467 56 : remote_rels = lappend(remote_rels, rel);
3468 56 : TargetPrivilegesCheck(rel->localrel, ACL_TRUNCATE);
3469 56 : rels = lappend(rels, rel->localrel);
3470 56 : relids = lappend_oid(relids, rel->localreloid);
3471 56 : if (RelationIsLogicallyLogged(rel->localrel))
3472 0 : relids_logged = lappend_oid(relids_logged, rel->localreloid);
3473 :
3474 : /*
3475 : * Truncate partitions if we got a message to truncate a partitioned
3476 : * table.
3477 : */
3478 56 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3479 : {
3480 : ListCell *child;
3481 8 : List *children = find_all_inheritors(rel->localreloid,
3482 : lockmode,
3483 : NULL);
3484 :
3485 30 : foreach(child, children)
3486 : {
3487 22 : Oid childrelid = lfirst_oid(child);
3488 : Relation childrel;
3489 :
3490 22 : if (list_member_oid(relids, childrelid))
3491 8 : continue;
3492 :
3493 : /* find_all_inheritors already got lock */
3494 14 : childrel = table_open(childrelid, NoLock);
3495 :
3496 : /*
3497 : * Ignore temp tables of other backends. See similar code in
3498 : * ExecuteTruncate().
3499 : */
3500 14 : if (RELATION_IS_OTHER_TEMP(childrel))
3501 : {
3502 0 : table_close(childrel, lockmode);
3503 0 : continue;
3504 : }
3505 :
3506 14 : TargetPrivilegesCheck(childrel, ACL_TRUNCATE);
3507 14 : rels = lappend(rels, childrel);
3508 14 : part_rels = lappend(part_rels, childrel);
3509 14 : relids = lappend_oid(relids, childrelid);
3510 : /* Log this relation only if needed for logical decoding */
3511 14 : if (RelationIsLogicallyLogged(childrel))
3512 0 : relids_logged = lappend_oid(relids_logged, childrelid);
3513 : }
3514 : }
3515 : }
3516 :
3517 : /*
3518 : * Even if we used CASCADE on the upstream primary we explicitly default
3519 : * to replaying changes without further cascading. This might be later
3520 : * changeable with a user specified option.
3521 : *
3522 : * MySubscription->runasowner tells us whether we want to execute
3523 : * replication actions as the subscription owner; the last argument to
3524 : * TruncateGuts tells it whether we want to switch to the table owner.
3525 : * Those are exactly opposite conditions.
3526 : */
3527 38 : ExecuteTruncateGuts(rels,
3528 : relids,
3529 : relids_logged,
3530 : DROP_RESTRICT,
3531 : restart_seqs,
3532 38 : !MySubscription->runasowner);
3533 94 : foreach(lc, remote_rels)
3534 : {
3535 56 : LogicalRepRelMapEntry *rel = lfirst(lc);
3536 :
3537 56 : logicalrep_rel_close(rel, NoLock);
3538 : }
3539 52 : foreach(lc, part_rels)
3540 : {
3541 14 : Relation rel = lfirst(lc);
3542 :
3543 14 : table_close(rel, NoLock);
3544 : }
3545 :
3546 38 : end_replication_step();
3547 : }
3548 :
3549 :
3550 : /*
3551 : * Logical replication protocol message dispatcher.
3552 : */
3553 : void
3554 674322 : apply_dispatch(StringInfo s)
3555 : {
3556 674322 : LogicalRepMsgType action = pq_getmsgbyte(s);
3557 : LogicalRepMsgType saved_command;
3558 :
3559 : /*
3560 : * Set the current command being applied. Since this function can be
3561 : * called recursively when applying spooled changes, save the current
3562 : * command.
3563 : */
3564 674322 : saved_command = apply_error_callback_arg.command;
3565 674322 : apply_error_callback_arg.command = action;
3566 :
3567 674322 : switch (action)
3568 : {
3569 924 : case LOGICAL_REP_MSG_BEGIN:
3570 924 : apply_handle_begin(s);
3571 924 : break;
3572 :
3573 858 : case LOGICAL_REP_MSG_COMMIT:
3574 858 : apply_handle_commit(s);
3575 858 : break;
3576 :
3577 371696 : case LOGICAL_REP_MSG_INSERT:
3578 371696 : apply_handle_insert(s);
3579 371640 : break;
3580 :
3581 132320 : case LOGICAL_REP_MSG_UPDATE:
3582 132320 : apply_handle_update(s);
3583 132304 : break;
3584 :
3585 163868 : case LOGICAL_REP_MSG_DELETE:
3586 163868 : apply_handle_delete(s);
3587 163868 : break;
3588 :
3589 38 : case LOGICAL_REP_MSG_TRUNCATE:
3590 38 : apply_handle_truncate(s);
3591 38 : break;
3592 :
3593 882 : case LOGICAL_REP_MSG_RELATION:
3594 882 : apply_handle_relation(s);
3595 882 : break;
3596 :
3597 36 : case LOGICAL_REP_MSG_TYPE:
3598 36 : apply_handle_type(s);
3599 36 : break;
3600 :
3601 18 : case LOGICAL_REP_MSG_ORIGIN:
3602 18 : apply_handle_origin(s);
3603 18 : break;
3604 :
3605 0 : case LOGICAL_REP_MSG_MESSAGE:
3606 :
3607 : /*
3608 : * Logical replication does not use generic logical messages yet.
3609 : * Although, it could be used by other applications that use this
3610 : * output plugin.
3611 : */
3612 0 : break;
3613 :
3614 1676 : case LOGICAL_REP_MSG_STREAM_START:
3615 1676 : apply_handle_stream_start(s);
3616 1676 : break;
3617 :
3618 1674 : case LOGICAL_REP_MSG_STREAM_STOP:
3619 1674 : apply_handle_stream_stop(s);
3620 1670 : break;
3621 :
3622 76 : case LOGICAL_REP_MSG_STREAM_ABORT:
3623 76 : apply_handle_stream_abort(s);
3624 76 : break;
3625 :
3626 122 : case LOGICAL_REP_MSG_STREAM_COMMIT:
3627 122 : apply_handle_stream_commit(s);
3628 118 : break;
3629 :
3630 32 : case LOGICAL_REP_MSG_BEGIN_PREPARE:
3631 32 : apply_handle_begin_prepare(s);
3632 32 : break;
3633 :
3634 30 : case LOGICAL_REP_MSG_PREPARE:
3635 30 : apply_handle_prepare(s);
3636 28 : break;
3637 :
3638 40 : case LOGICAL_REP_MSG_COMMIT_PREPARED:
3639 40 : apply_handle_commit_prepared(s);
3640 40 : break;
3641 :
3642 10 : case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
3643 10 : apply_handle_rollback_prepared(s);
3644 10 : break;
3645 :
3646 22 : case LOGICAL_REP_MSG_STREAM_PREPARE:
3647 22 : apply_handle_stream_prepare(s);
3648 22 : break;
3649 :
3650 0 : default:
3651 0 : ereport(ERROR,
3652 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
3653 : errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3654 : }
3655 :
3656 : /* Reset the current command */
3657 674240 : apply_error_callback_arg.command = saved_command;
3658 674240 : }
3659 :
3660 : /*
3661 : * Figure out which write/flush positions to report to the walsender process.
3662 : *
3663 : * We can't simply report back the last LSN the walsender sent us because the
3664 : * local transaction might not yet be flushed to disk locally. Instead we
3665 : * build a list that associates local with remote LSNs for every commit. When
3666 : * reporting back the flush position to the sender we iterate that list and
3667 : * check which entries on it are already locally flushed. Those we can report
3668 : * as having been flushed.
3669 : *
3670 : * The have_pending_txes is true if there are outstanding transactions that
3671 : * need to be flushed.
3672 : */
3673 : static void
3674 125282 : get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
3675 : bool *have_pending_txes)
3676 : {
3677 : dlist_mutable_iter iter;
3678 125282 : XLogRecPtr local_flush = GetFlushRecPtr(NULL);
3679 :
3680 125282 : *write = InvalidXLogRecPtr;
3681 125282 : *flush = InvalidXLogRecPtr;
3682 :
3683 126230 : dlist_foreach_modify(iter, &lsn_mapping)
3684 : {
3685 16736 : FlushPosition *pos =
3686 16736 : dlist_container(FlushPosition, node, iter.cur);
3687 :
3688 16736 : *write = pos->remote_end;
3689 :
3690 16736 : if (pos->local_end <= local_flush)
3691 : {
3692 948 : *flush = pos->remote_end;
3693 948 : dlist_delete(iter.cur);
3694 948 : pfree(pos);
3695 : }
3696 : else
3697 : {
3698 : /*
3699 : * Don't want to uselessly iterate over the rest of the list which
3700 : * could potentially be long. Instead get the last element and
3701 : * grab the write position from there.
3702 : */
3703 15788 : pos = dlist_tail_element(FlushPosition, node,
3704 : &lsn_mapping);
3705 15788 : *write = pos->remote_end;
3706 15788 : *have_pending_txes = true;
3707 15788 : return;
3708 : }
3709 : }
3710 :
3711 109494 : *have_pending_txes = !dlist_is_empty(&lsn_mapping);
3712 : }
3713 :
3714 : /*
3715 : * Store current remote/local lsn pair in the tracking list.
3716 : */
3717 : void
3718 1070 : store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
3719 : {
3720 : FlushPosition *flushpos;
3721 :
3722 : /*
3723 : * Skip for parallel apply workers, because the lsn_mapping is maintained
3724 : * by the leader apply worker.
3725 : */
3726 1070 : if (am_parallel_apply_worker())
3727 38 : return;
3728 :
3729 : /* Need to do this in permanent context */
3730 1032 : MemoryContextSwitchTo(ApplyContext);
3731 :
3732 : /* Track commit lsn */
3733 1032 : flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
3734 1032 : flushpos->local_end = local_lsn;
3735 1032 : flushpos->remote_end = remote_lsn;
3736 :
3737 1032 : dlist_push_tail(&lsn_mapping, &flushpos->node);
3738 1032 : MemoryContextSwitchTo(ApplyMessageContext);
3739 : }
3740 :
3741 :
3742 : /* Update statistics of the worker. */
3743 : static void
3744 372860 : UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
3745 : {
3746 372860 : MyLogicalRepWorker->last_lsn = last_lsn;
3747 372860 : MyLogicalRepWorker->last_send_time = send_time;
3748 372860 : MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
3749 372860 : if (reply)
3750 : {
3751 3192 : MyLogicalRepWorker->reply_lsn = last_lsn;
3752 3192 : MyLogicalRepWorker->reply_time = send_time;
3753 : }
3754 372860 : }
3755 :
3756 : /*
3757 : * Apply main loop.
3758 : */
3759 : static void
3760 770 : LogicalRepApplyLoop(XLogRecPtr last_received)
3761 : {
3762 770 : TimestampTz last_recv_timestamp = GetCurrentTimestamp();
3763 770 : bool ping_sent = false;
3764 : TimeLineID tli;
3765 : ErrorContextCallback errcallback;
3766 770 : RetainDeadTuplesData rdt_data = {0};
3767 :
3768 : /*
3769 : * Init the ApplyMessageContext which we clean up after each replication
3770 : * protocol message.
3771 : */
3772 770 : ApplyMessageContext = AllocSetContextCreate(ApplyContext,
3773 : "ApplyMessageContext",
3774 : ALLOCSET_DEFAULT_SIZES);
3775 :
3776 : /*
3777 : * This memory context is used for per-stream data when the streaming mode
3778 : * is enabled. This context is reset on each stream stop.
3779 : */
3780 770 : LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
3781 : "LogicalStreamingContext",
3782 : ALLOCSET_DEFAULT_SIZES);
3783 :
3784 : /* mark as idle, before starting to loop */
3785 770 : pgstat_report_activity(STATE_IDLE, NULL);
3786 :
3787 : /*
3788 : * Push apply error context callback. Fields will be filled while applying
3789 : * a change.
3790 : */
3791 770 : errcallback.callback = apply_error_callback;
3792 770 : errcallback.previous = error_context_stack;
3793 770 : error_context_stack = &errcallback;
3794 770 : apply_error_context_stack = error_context_stack;
3795 :
3796 : /* This outer loop iterates once per wait. */
3797 : for (;;)
3798 121060 : {
3799 121830 : pgsocket fd = PGINVALID_SOCKET;
3800 : int rc;
3801 : int len;
3802 121830 : char *buf = NULL;
3803 121830 : bool endofstream = false;
3804 : long wait_time;
3805 :
3806 121830 : CHECK_FOR_INTERRUPTS();
3807 :
3808 121830 : MemoryContextSwitchTo(ApplyMessageContext);
3809 :
3810 121830 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
3811 :
3812 121796 : if (len != 0)
3813 : {
3814 : /* Loop to process all available data (without blocking). */
3815 : for (;;)
3816 : {
3817 492568 : CHECK_FOR_INTERRUPTS();
3818 :
3819 492568 : if (len == 0)
3820 : {
3821 119690 : break;
3822 : }
3823 372878 : else if (len < 0)
3824 : {
3825 18 : ereport(LOG,
3826 : (errmsg("data stream from publisher has ended")));
3827 18 : endofstream = true;
3828 18 : break;
3829 : }
3830 : else
3831 : {
3832 : int c;
3833 : StringInfoData s;
3834 :
3835 372860 : if (ConfigReloadPending)
3836 : {
3837 0 : ConfigReloadPending = false;
3838 0 : ProcessConfigFile(PGC_SIGHUP);
3839 : }
3840 :
3841 : /* Reset timeout. */
3842 372860 : last_recv_timestamp = GetCurrentTimestamp();
3843 372860 : ping_sent = false;
3844 :
3845 372860 : rdt_data.last_recv_time = last_recv_timestamp;
3846 :
3847 : /* Ensure we are reading the data into our memory context. */
3848 372860 : MemoryContextSwitchTo(ApplyMessageContext);
3849 :
3850 372860 : initReadOnlyStringInfo(&s, buf, len);
3851 :
3852 372860 : c = pq_getmsgbyte(&s);
3853 :
3854 372860 : if (c == 'w')
3855 : {
3856 : XLogRecPtr start_lsn;
3857 : XLogRecPtr end_lsn;
3858 : TimestampTz send_time;
3859 :
3860 369632 : start_lsn = pq_getmsgint64(&s);
3861 369632 : end_lsn = pq_getmsgint64(&s);
3862 369632 : send_time = pq_getmsgint64(&s);
3863 :
3864 369632 : if (last_received < start_lsn)
3865 296826 : last_received = start_lsn;
3866 :
3867 369632 : if (last_received < end_lsn)
3868 0 : last_received = end_lsn;
3869 :
3870 369632 : UpdateWorkerStats(last_received, send_time, false);
3871 :
3872 369632 : apply_dispatch(&s);
3873 :
3874 369558 : maybe_advance_nonremovable_xid(&rdt_data, false);
3875 : }
3876 3228 : else if (c == 'k')
3877 : {
3878 : XLogRecPtr end_lsn;
3879 : TimestampTz timestamp;
3880 : bool reply_requested;
3881 :
3882 3192 : end_lsn = pq_getmsgint64(&s);
3883 3192 : timestamp = pq_getmsgint64(&s);
3884 3192 : reply_requested = pq_getmsgbyte(&s);
3885 :
3886 3192 : if (last_received < end_lsn)
3887 1836 : last_received = end_lsn;
3888 :
3889 3192 : send_feedback(last_received, reply_requested, false);
3890 :
3891 3192 : maybe_advance_nonremovable_xid(&rdt_data, false);
3892 :
3893 3192 : UpdateWorkerStats(last_received, timestamp, true);
3894 : }
3895 36 : else if (c == 's') /* Primary status update */
3896 : {
3897 36 : rdt_data.remote_lsn = pq_getmsgint64(&s);
3898 36 : rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
3899 36 : rdt_data.remote_nextxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
3900 36 : rdt_data.reply_time = pq_getmsgint64(&s);
3901 :
3902 : /*
3903 : * This should never happen, see
3904 : * ProcessStandbyPSRequestMessage. But if it happens
3905 : * due to a bug, we don't want to proceed as it can
3906 : * incorrectly advance oldest_nonremovable_xid.
3907 : */
3908 36 : if (XLogRecPtrIsInvalid(rdt_data.remote_lsn))
3909 0 : elog(ERROR, "cannot get the latest WAL position from the publisher");
3910 :
3911 36 : maybe_advance_nonremovable_xid(&rdt_data, true);
3912 :
3913 36 : UpdateWorkerStats(last_received, rdt_data.reply_time, false);
3914 : }
3915 : /* other message types are purposefully ignored */
3916 :
3917 372786 : MemoryContextReset(ApplyMessageContext);
3918 : }
3919 :
3920 372786 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
3921 : }
3922 : }
3923 :
3924 : /* confirm all writes so far */
3925 121722 : send_feedback(last_received, false, false);
3926 :
3927 : /* Reset the timestamp if no message was received */
3928 121722 : rdt_data.last_recv_time = 0;
3929 :
3930 121722 : maybe_advance_nonremovable_xid(&rdt_data, false);
3931 :
3932 121722 : if (!in_remote_transaction && !in_streamed_transaction)
3933 : {
3934 : /*
3935 : * If we didn't get any transactions for a while there might be
3936 : * unconsumed invalidation messages in the queue, consume them
3937 : * now.
3938 : */
3939 5198 : AcceptInvalidationMessages();
3940 5198 : maybe_reread_subscription();
3941 :
3942 : /* Process any table synchronization changes. */
3943 5120 : process_syncing_tables(last_received);
3944 : }
3945 :
3946 : /* Cleanup the memory. */
3947 121262 : MemoryContextReset(ApplyMessageContext);
3948 121262 : MemoryContextSwitchTo(TopMemoryContext);
3949 :
3950 : /* Check if we need to exit the streaming loop. */
3951 121262 : if (endofstream)
3952 18 : break;
3953 :
3954 : /*
3955 : * Wait for more data or latch. If we have unflushed transactions,
3956 : * wake up after WalWriterDelay to see if they've been flushed yet (in
3957 : * which case we should send a feedback message). Otherwise, there's
3958 : * no particular urgency about waking up unless we get data or a
3959 : * signal.
3960 : */
3961 121244 : if (!dlist_is_empty(&lsn_mapping))
3962 14734 : wait_time = WalWriterDelay;
3963 : else
3964 106510 : wait_time = NAPTIME_PER_CYCLE;
3965 :
3966 : /*
3967 : * Ensure to wake up when it's possible to advance the non-removable
3968 : * transaction ID.
3969 : */
3970 121244 : if (rdt_data.phase == RDT_GET_CANDIDATE_XID &&
3971 121172 : rdt_data.xid_advance_interval)
3972 80 : wait_time = Min(wait_time, rdt_data.xid_advance_interval);
3973 :
3974 121244 : rc = WaitLatchOrSocket(MyLatch,
3975 : WL_SOCKET_READABLE | WL_LATCH_SET |
3976 : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
3977 : fd, wait_time,
3978 : WAIT_EVENT_LOGICAL_APPLY_MAIN);
3979 :
3980 121244 : if (rc & WL_LATCH_SET)
3981 : {
3982 1346 : ResetLatch(MyLatch);
3983 1346 : CHECK_FOR_INTERRUPTS();
3984 : }
3985 :
3986 121060 : if (ConfigReloadPending)
3987 : {
3988 16 : ConfigReloadPending = false;
3989 16 : ProcessConfigFile(PGC_SIGHUP);
3990 : }
3991 :
3992 121060 : if (rc & WL_TIMEOUT)
3993 : {
3994 : /*
3995 : * We didn't receive anything new. If we haven't heard anything
3996 : * from the server for more than wal_receiver_timeout / 2, ping
3997 : * the server. Also, if it's been longer than
3998 : * wal_receiver_status_interval since the last update we sent,
3999 : * send a status update to the primary anyway, to report any
4000 : * progress in applying WAL.
4001 : */
4002 352 : bool requestReply = false;
4003 :
4004 : /*
4005 : * Check if time since last receive from primary has reached the
4006 : * configured limit.
4007 : */
4008 352 : if (wal_receiver_timeout > 0)
4009 : {
4010 352 : TimestampTz now = GetCurrentTimestamp();
4011 : TimestampTz timeout;
4012 :
4013 352 : timeout =
4014 352 : TimestampTzPlusMilliseconds(last_recv_timestamp,
4015 : wal_receiver_timeout);
4016 :
4017 352 : if (now >= timeout)
4018 0 : ereport(ERROR,
4019 : (errcode(ERRCODE_CONNECTION_FAILURE),
4020 : errmsg("terminating logical replication worker due to timeout")));
4021 :
4022 : /* Check to see if it's time for a ping. */
4023 352 : if (!ping_sent)
4024 : {
4025 352 : timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
4026 : (wal_receiver_timeout / 2));
4027 352 : if (now >= timeout)
4028 : {
4029 0 : requestReply = true;
4030 0 : ping_sent = true;
4031 : }
4032 : }
4033 : }
4034 :
4035 352 : send_feedback(last_received, requestReply, requestReply);
4036 :
4037 352 : maybe_advance_nonremovable_xid(&rdt_data, false);
4038 :
4039 : /*
4040 : * Force reporting to ensure long idle periods don't lead to
4041 : * arbitrarily delayed stats. Stats can only be reported outside
4042 : * of (implicit or explicit) transactions. That shouldn't lead to
4043 : * stats being delayed for long, because transactions are either
4044 : * sent as a whole on commit or streamed. Streamed transactions
4045 : * are spilled to disk and applied on commit.
4046 : */
4047 352 : if (!IsTransactionState())
4048 352 : pgstat_report_stat(true);
4049 : }
4050 : }
4051 :
4052 : /* Pop the error context stack */
4053 18 : error_context_stack = errcallback.previous;
4054 18 : apply_error_context_stack = error_context_stack;
4055 :
4056 : /* All done */
4057 18 : walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
4058 0 : }
4059 :
4060 : /*
4061 : * Send a Standby Status Update message to server.
4062 : *
4063 : * 'recvpos' is the latest LSN we've received data to, force is set if we need
4064 : * to send a response to avoid timeouts.
4065 : */
4066 : static void
4067 125266 : send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
4068 : {
4069 : static StringInfo reply_message = NULL;
4070 : static TimestampTz send_time = 0;
4071 :
4072 : static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
4073 : static XLogRecPtr last_writepos = InvalidXLogRecPtr;
4074 :
4075 : XLogRecPtr writepos;
4076 : XLogRecPtr flushpos;
4077 : TimestampTz now;
4078 : bool have_pending_txes;
4079 :
4080 : /*
4081 : * If the user doesn't want status to be reported to the publisher, be
4082 : * sure to exit before doing anything at all.
4083 : */
4084 125266 : if (!force && wal_receiver_status_interval <= 0)
4085 33078 : return;
4086 :
4087 : /* It's legal to not pass a recvpos */
4088 125266 : if (recvpos < last_recvpos)
4089 0 : recvpos = last_recvpos;
4090 :
4091 125266 : get_flush_position(&writepos, &flushpos, &have_pending_txes);
4092 :
4093 : /*
4094 : * No outstanding transactions to flush, we can report the latest received
4095 : * position. This is important for synchronous replication.
4096 : */
4097 125266 : if (!have_pending_txes)
4098 109484 : flushpos = writepos = recvpos;
4099 :
4100 125266 : if (writepos < last_writepos)
4101 0 : writepos = last_writepos;
4102 :
4103 125266 : if (flushpos < last_flushpos)
4104 15694 : flushpos = last_flushpos;
4105 :
4106 125266 : now = GetCurrentTimestamp();
4107 :
4108 : /* if we've already reported everything we're good */
4109 125266 : if (!force &&
4110 125262 : writepos == last_writepos &&
4111 33606 : flushpos == last_flushpos &&
4112 33340 : !TimestampDifferenceExceeds(send_time, now,
4113 : wal_receiver_status_interval * 1000))
4114 33078 : return;
4115 92188 : send_time = now;
4116 :
4117 92188 : if (!reply_message)
4118 : {
4119 770 : MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
4120 :
4121 770 : reply_message = makeStringInfo();
4122 770 : MemoryContextSwitchTo(oldctx);
4123 : }
4124 : else
4125 91418 : resetStringInfo(reply_message);
4126 :
4127 92188 : pq_sendbyte(reply_message, 'r');
4128 92188 : pq_sendint64(reply_message, recvpos); /* write */
4129 92188 : pq_sendint64(reply_message, flushpos); /* flush */
4130 92188 : pq_sendint64(reply_message, writepos); /* apply */
4131 92188 : pq_sendint64(reply_message, now); /* sendTime */
4132 92188 : pq_sendbyte(reply_message, requestReply); /* replyRequested */
4133 :
4134 92188 : elog(DEBUG2, "sending feedback (force %d) to recv %X/%08X, write %X/%08X, flush %X/%08X",
4135 : force,
4136 : LSN_FORMAT_ARGS(recvpos),
4137 : LSN_FORMAT_ARGS(writepos),
4138 : LSN_FORMAT_ARGS(flushpos));
4139 :
4140 92188 : walrcv_send(LogRepWorkerWalRcvConn,
4141 : reply_message->data, reply_message->len);
4142 :
4143 92188 : if (recvpos > last_recvpos)
4144 91656 : last_recvpos = recvpos;
4145 92188 : if (writepos > last_writepos)
4146 91658 : last_writepos = writepos;
4147 92188 : if (flushpos > last_flushpos)
4148 91156 : last_flushpos = flushpos;
4149 : }
4150 :
4151 : /*
4152 : * Attempt to advance the non-removable transaction ID.
4153 : *
4154 : * See comments atop worker.c for details.
4155 : */
4156 : static void
4157 494860 : maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
4158 : bool status_received)
4159 : {
4160 494860 : if (!can_advance_nonremovable_xid(rdt_data))
4161 494434 : return;
4162 :
4163 426 : process_rdt_phase_transition(rdt_data, status_received);
4164 : }
4165 :
4166 : /*
4167 : * Preliminary check to determine if advancing the non-removable transaction ID
4168 : * is allowed.
4169 : */
4170 : static bool
4171 494860 : can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
4172 : {
4173 : /*
4174 : * It is sufficient to manage non-removable transaction ID for a
4175 : * subscription by the main apply worker to detect conflicts reliably even
4176 : * for table sync or parallel apply workers.
4177 : */
4178 494860 : if (!am_leader_apply_worker())
4179 592 : return false;
4180 :
4181 : /* No need to advance if retaining dead tuples is not required */
4182 494268 : if (!MySubscription->retaindeadtuples)
4183 493842 : return false;
4184 :
4185 426 : return true;
4186 : }
4187 :
4188 : /*
4189 : * Process phase transitions during the non-removable transaction ID
4190 : * advancement. See comments atop worker.c for details of the transition.
4191 : */
4192 : static void
4193 532 : process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
4194 : bool status_received)
4195 : {
4196 532 : switch (rdt_data->phase)
4197 : {
4198 206 : case RDT_GET_CANDIDATE_XID:
4199 206 : get_candidate_xid(rdt_data);
4200 206 : break;
4201 38 : case RDT_REQUEST_PUBLISHER_STATUS:
4202 38 : request_publisher_status(rdt_data);
4203 38 : break;
4204 182 : case RDT_WAIT_FOR_PUBLISHER_STATUS:
4205 182 : wait_for_publisher_status(rdt_data, status_received);
4206 182 : break;
4207 106 : case RDT_WAIT_FOR_LOCAL_FLUSH:
4208 106 : wait_for_local_flush(rdt_data);
4209 106 : break;
4210 : }
4211 532 : }
4212 :
4213 : /*
4214 : * Workhorse for the RDT_GET_CANDIDATE_XID phase.
4215 : */
4216 : static void
4217 206 : get_candidate_xid(RetainDeadTuplesData *rdt_data)
4218 : {
4219 : TransactionId oldest_running_xid;
4220 : TimestampTz now;
4221 :
4222 : /*
4223 : * Use last_recv_time when applying changes in the loop to avoid
4224 : * unnecessary system time retrieval. If last_recv_time is not available,
4225 : * obtain the current timestamp.
4226 : */
4227 206 : now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
4228 :
4229 : /*
4230 : * Compute the candidate_xid and request the publisher status at most once
4231 : * per xid_advance_interval. Refer to adjust_xid_advance_interval() for
4232 : * details on how this value is dynamically adjusted. This is to avoid
4233 : * using CPU and network resources without making much progress.
4234 : */
4235 206 : if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
4236 : rdt_data->xid_advance_interval))
4237 150 : return;
4238 :
4239 : /*
4240 : * Immediately update the timer, even if the function returns later
4241 : * without setting candidate_xid due to inactivity on the subscriber. This
4242 : * avoids frequent calls to GetOldestActiveTransactionId.
4243 : */
4244 56 : rdt_data->candidate_xid_time = now;
4245 :
4246 : /*
4247 : * Consider transactions in the current database, as only dead tuples from
4248 : * this database are required for conflict detection.
4249 : */
4250 56 : oldest_running_xid = GetOldestActiveTransactionId(false, false);
4251 :
4252 : /*
4253 : * Oldest active transaction ID (oldest_running_xid) can't be behind any
4254 : * of its previously computed value.
4255 : */
4256 : Assert(TransactionIdPrecedesOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
4257 : oldest_running_xid));
4258 :
4259 : /* Return if the oldest_nonremovable_xid cannot be advanced */
4260 56 : if (TransactionIdEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
4261 : oldest_running_xid))
4262 : {
4263 18 : adjust_xid_advance_interval(rdt_data, false);
4264 18 : return;
4265 : }
4266 :
4267 38 : adjust_xid_advance_interval(rdt_data, true);
4268 :
4269 38 : rdt_data->candidate_xid = oldest_running_xid;
4270 38 : rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
4271 :
4272 : /* process the next phase */
4273 38 : process_rdt_phase_transition(rdt_data, false);
4274 : }
4275 :
4276 : /*
4277 : * Workhorse for the RDT_REQUEST_PUBLISHER_STATUS phase.
4278 : */
4279 : static void
4280 38 : request_publisher_status(RetainDeadTuplesData *rdt_data)
4281 : {
4282 : static StringInfo request_message = NULL;
4283 :
4284 38 : if (!request_message)
4285 : {
4286 14 : MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
4287 :
4288 14 : request_message = makeStringInfo();
4289 14 : MemoryContextSwitchTo(oldctx);
4290 : }
4291 : else
4292 24 : resetStringInfo(request_message);
4293 :
4294 : /*
4295 : * Send the current time to update the remote walsender's latest reply
4296 : * message received time.
4297 : */
4298 38 : pq_sendbyte(request_message, 'p');
4299 38 : pq_sendint64(request_message, GetCurrentTimestamp());
4300 :
4301 38 : elog(DEBUG2, "sending publisher status request message");
4302 :
4303 : /* Send a request for the publisher status */
4304 38 : walrcv_send(LogRepWorkerWalRcvConn,
4305 : request_message->data, request_message->len);
4306 :
4307 38 : rdt_data->phase = RDT_WAIT_FOR_PUBLISHER_STATUS;
4308 :
4309 : /*
4310 : * Skip calling maybe_advance_nonremovable_xid() since further transition
4311 : * is possible only once we receive the publisher status message.
4312 : */
4313 38 : }
4314 :
4315 : /*
4316 : * Workhorse for the RDT_WAIT_FOR_PUBLISHER_STATUS phase.
4317 : */
4318 : static void
4319 182 : wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
4320 : bool status_received)
4321 : {
4322 : /*
4323 : * Return if we have requested but not yet received the publisher status.
4324 : */
4325 182 : if (!status_received)
4326 146 : return;
4327 :
4328 36 : if (!FullTransactionIdIsValid(rdt_data->remote_wait_for))
4329 36 : rdt_data->remote_wait_for = rdt_data->remote_nextxid;
4330 :
4331 : /*
4332 : * Check if all remote concurrent transactions that were active at the
4333 : * first status request have now completed. If completed, proceed to the
4334 : * next phase; otherwise, continue checking the publisher status until
4335 : * these transactions finish.
4336 : *
4337 : * It's possible that transactions in the commit phase during the last
4338 : * cycle have now finished committing, but remote_oldestxid remains older
4339 : * than remote_wait_for. This can happen if some old transaction came in
4340 : * the commit phase when we requested status in this cycle. We do not
4341 : * handle this case explicitly as it's rare and the benefit doesn't
4342 : * justify the required complexity. Tracking would require either caching
4343 : * all xids at the publisher or sending them to subscribers. The condition
4344 : * will resolve naturally once the remaining transactions are finished.
4345 : *
4346 : * Directly advancing the non-removable transaction ID is possible if
4347 : * there are no activities on the publisher since the last advancement
4348 : * cycle. However, it requires maintaining two fields, last_remote_nextxid
4349 : * and last_remote_lsn, within the structure for comparison with the
4350 : * current cycle's values. Considering the minimal cost of continuing in
4351 : * RDT_WAIT_FOR_LOCAL_FLUSH without awaiting changes, we opted not to
4352 : * advance the transaction ID here.
4353 : */
4354 36 : if (FullTransactionIdPrecedesOrEquals(rdt_data->remote_wait_for,
4355 : rdt_data->remote_oldestxid))
4356 36 : rdt_data->phase = RDT_WAIT_FOR_LOCAL_FLUSH;
4357 : else
4358 0 : rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
4359 :
4360 : /* process the next phase */
4361 36 : process_rdt_phase_transition(rdt_data, false);
4362 : }
4363 :
4364 : /*
4365 : * Workhorse for the RDT_WAIT_FOR_LOCAL_FLUSH phase.
4366 : */
4367 : static void
4368 106 : wait_for_local_flush(RetainDeadTuplesData *rdt_data)
4369 : {
4370 : Assert(!XLogRecPtrIsInvalid(rdt_data->remote_lsn) &&
4371 : TransactionIdIsValid(rdt_data->candidate_xid));
4372 :
4373 : /*
4374 : * We expect the publisher and subscriber clocks to be in sync using time
4375 : * sync service like NTP. Otherwise, we will advance this worker's
4376 : * oldest_nonremovable_xid prematurely, leading to the removal of rows
4377 : * required to detect conflicts reliably. This check primarily addresses
4378 : * scenarios where the publisher's clock falls behind; if the publisher's
4379 : * clock is ahead, subsequent transactions will naturally bear later
4380 : * commit timestamps, conforming to the design outlined atop worker.c.
4381 : *
4382 : * XXX Consider waiting for the publisher's clock to catch up with the
4383 : * subscriber's before proceeding to the next phase.
4384 : */
4385 106 : if (TimestampDifferenceExceeds(rdt_data->reply_time,
4386 : rdt_data->candidate_xid_time, 0))
4387 0 : ereport(ERROR,
4388 : errmsg_internal("oldest_nonremovable_xid transaction ID could be advanced prematurely"),
4389 : errdetail_internal("The clock on the publisher is behind that of the subscriber."));
4390 :
4391 : /*
4392 : * Do not attempt to advance the non-removable transaction ID when table
4393 : * sync is in progress. During this time, changes from a single
4394 : * transaction may be applied by multiple table sync workers corresponding
4395 : * to the target tables. So, it's necessary for all table sync workers to
4396 : * apply and flush the corresponding changes before advancing the
4397 : * transaction ID, otherwise, dead tuples that are still needed for
4398 : * conflict detection in table sync workers could be removed prematurely.
4399 : * However, confirming the apply and flush progress across all table sync
4400 : * workers is complex and not worth the effort, so we simply return if not
4401 : * all tables are in the READY state.
4402 : *
4403 : * It is safe to add new tables with initial states to the subscription
4404 : * after this check because any changes applied to these tables should
4405 : * have a WAL position greater than the rdt_data->remote_lsn.
4406 : */
4407 106 : if (!AllTablesyncsReady())
4408 6 : return;
4409 :
4410 : /*
4411 : * Update and check the remote flush position if we are applying changes
4412 : * in a loop. This is done at most once per WalWriterDelay to avoid
4413 : * performing costly operations in get_flush_position() too frequently
4414 : * during change application.
4415 : */
4416 138 : if (last_flushpos < rdt_data->remote_lsn && rdt_data->last_recv_time &&
4417 38 : TimestampDifferenceExceeds(rdt_data->flushpos_update_time,
4418 : rdt_data->last_recv_time, WalWriterDelay))
4419 : {
4420 : XLogRecPtr writepos;
4421 : XLogRecPtr flushpos;
4422 : bool have_pending_txes;
4423 :
4424 : /* Fetch the latest remote flush position */
4425 16 : get_flush_position(&writepos, &flushpos, &have_pending_txes);
4426 :
4427 16 : if (flushpos > last_flushpos)
4428 0 : last_flushpos = flushpos;
4429 :
4430 16 : rdt_data->flushpos_update_time = rdt_data->last_recv_time;
4431 : }
4432 :
4433 : /* Return to wait for the changes to be applied */
4434 100 : if (last_flushpos < rdt_data->remote_lsn)
4435 68 : return;
4436 :
4437 : /*
4438 : * Reaching here means the remote WAL position has been received, and all
4439 : * transactions up to that position on the publisher have been applied and
4440 : * flushed locally. So, we can advance the non-removable transaction ID.
4441 : */
4442 32 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
4443 32 : MyLogicalRepWorker->oldest_nonremovable_xid = rdt_data->candidate_xid;
4444 32 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
4445 :
4446 32 : elog(DEBUG2, "confirmed flush up to remote lsn %X/%X: new oldest_nonremovable_xid %u",
4447 : LSN_FORMAT_ARGS(rdt_data->remote_lsn),
4448 : rdt_data->candidate_xid);
4449 :
4450 : /* Notify launcher to update the xmin of the conflict slot */
4451 32 : ApplyLauncherWakeup();
4452 :
4453 : /*
4454 : * Reset all data fields except those used to determine the timing for the
4455 : * next round of transaction ID advancement. We can even use
4456 : * flushpos_update_time in the next round to decide whether to get the
4457 : * latest flush position.
4458 : */
4459 32 : rdt_data->phase = RDT_GET_CANDIDATE_XID;
4460 32 : rdt_data->remote_lsn = InvalidXLogRecPtr;
4461 32 : rdt_data->remote_oldestxid = InvalidFullTransactionId;
4462 32 : rdt_data->remote_nextxid = InvalidFullTransactionId;
4463 32 : rdt_data->reply_time = 0;
4464 32 : rdt_data->remote_wait_for = InvalidFullTransactionId;
4465 32 : rdt_data->candidate_xid = InvalidTransactionId;
4466 :
4467 : /* process the next phase */
4468 32 : process_rdt_phase_transition(rdt_data, false);
4469 : }
4470 :
4471 : /*
4472 : * Adjust the interval for advancing non-removable transaction IDs.
4473 : *
4474 : * We double the interval to try advancing the non-removable transaction IDs
4475 : * if there is no activity on the node. The maximum value of the interval is
4476 : * capped by wal_receiver_status_interval if it is not zero, otherwise to a
4477 : * 3 minutes which should be sufficient to avoid using CPU or network
4478 : * resources without much benefit.
4479 : *
4480 : * The interval is reset to a minimum value of 100ms once there is some
4481 : * activity on the node.
4482 : *
4483 : * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
4484 : * consider the other interval or a separate GUC if the need arises.
4485 : */
4486 : static void
4487 56 : adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
4488 : {
4489 56 : if (!new_xid_found && rdt_data->xid_advance_interval)
4490 14 : {
4491 14 : int max_interval = wal_receiver_status_interval
4492 28 : ? wal_receiver_status_interval * 1000
4493 14 : : MAX_XID_ADVANCE_INTERVAL;
4494 :
4495 : /*
4496 : * No new transaction ID has been assigned since the last check, so
4497 : * double the interval, but not beyond the maximum allowable value.
4498 : */
4499 14 : rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
4500 : max_interval);
4501 : }
4502 : else
4503 : {
4504 : /*
4505 : * A new transaction ID was found or the interval is not yet
4506 : * initialized, so set the interval to the minimum value.
4507 : */
4508 42 : rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL;
4509 : }
4510 56 : }
4511 :
4512 : /*
4513 : * Exit routine for apply workers due to subscription parameter changes.
4514 : */
4515 : static void
4516 82 : apply_worker_exit(void)
4517 : {
4518 82 : if (am_parallel_apply_worker())
4519 : {
4520 : /*
4521 : * Don't stop the parallel apply worker as the leader will detect the
4522 : * subscription parameter change and restart logical replication later
4523 : * anyway. This also prevents the leader from reporting errors when
4524 : * trying to communicate with a stopped parallel apply worker, which
4525 : * would accidentally disable subscriptions if disable_on_error was
4526 : * set.
4527 : */
4528 0 : return;
4529 : }
4530 :
4531 : /*
4532 : * Reset the last-start time for this apply worker so that the launcher
4533 : * will restart it without waiting for wal_retrieve_retry_interval if the
4534 : * subscription is still active, and so that we won't leak that hash table
4535 : * entry if it isn't.
4536 : */
4537 82 : if (am_leader_apply_worker())
4538 82 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
4539 :
4540 82 : proc_exit(0);
4541 : }
4542 :
4543 : /*
4544 : * Reread subscription info if needed.
4545 : *
4546 : * For significant changes, we react by exiting the current process; a new
4547 : * one will be launched afterwards if needed.
4548 : */
4549 : void
4550 7178 : maybe_reread_subscription(void)
4551 : {
4552 : MemoryContext oldctx;
4553 : Subscription *newsub;
4554 7178 : bool started_tx = false;
4555 :
4556 : /* When cache state is valid there is nothing to do here. */
4557 7178 : if (MySubscriptionValid)
4558 7014 : return;
4559 :
4560 : /* This function might be called inside or outside of transaction. */
4561 164 : if (!IsTransactionState())
4562 : {
4563 156 : StartTransactionCommand();
4564 156 : started_tx = true;
4565 : }
4566 :
4567 : /* Ensure allocations in permanent context. */
4568 164 : oldctx = MemoryContextSwitchTo(ApplyContext);
4569 :
4570 164 : newsub = GetSubscription(MyLogicalRepWorker->subid, true);
4571 :
4572 : /*
4573 : * Exit if the subscription was removed. This normally should not happen
4574 : * as the worker gets killed during DROP SUBSCRIPTION.
4575 : */
4576 164 : if (!newsub)
4577 : {
4578 0 : ereport(LOG,
4579 : (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
4580 : MySubscription->name)));
4581 :
4582 : /* Ensure we remove no-longer-useful entry for worker's start time */
4583 0 : if (am_leader_apply_worker())
4584 0 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
4585 :
4586 0 : proc_exit(0);
4587 : }
4588 :
4589 : /* Exit if the subscription was disabled. */
4590 164 : if (!newsub->enabled)
4591 : {
4592 22 : ereport(LOG,
4593 : (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
4594 : MySubscription->name)));
4595 :
4596 22 : apply_worker_exit();
4597 : }
4598 :
4599 : /* !slotname should never happen when enabled is true. */
4600 : Assert(newsub->slotname);
4601 :
4602 : /* two-phase cannot be altered while the worker is running */
4603 : Assert(newsub->twophasestate == MySubscription->twophasestate);
4604 :
4605 : /*
4606 : * Exit if any parameter that affects the remote connection was changed.
4607 : * The launcher will start a new worker but note that the parallel apply
4608 : * worker won't restart if the streaming option's value is changed from
4609 : * 'parallel' to any other value or the server decides not to stream the
4610 : * in-progress transaction.
4611 : */
4612 142 : if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
4613 138 : strcmp(newsub->name, MySubscription->name) != 0 ||
4614 136 : strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
4615 136 : newsub->binary != MySubscription->binary ||
4616 124 : newsub->stream != MySubscription->stream ||
4617 114 : newsub->passwordrequired != MySubscription->passwordrequired ||
4618 114 : strcmp(newsub->origin, MySubscription->origin) != 0 ||
4619 110 : newsub->owner != MySubscription->owner ||
4620 108 : !equal(newsub->publications, MySubscription->publications))
4621 : {
4622 52 : if (am_parallel_apply_worker())
4623 0 : ereport(LOG,
4624 : (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
4625 : MySubscription->name)));
4626 : else
4627 52 : ereport(LOG,
4628 : (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
4629 : MySubscription->name)));
4630 :
4631 52 : apply_worker_exit();
4632 : }
4633 :
4634 : /*
4635 : * Exit if the subscription owner's superuser privileges have been
4636 : * revoked.
4637 : */
4638 90 : if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
4639 : {
4640 8 : if (am_parallel_apply_worker())
4641 0 : ereport(LOG,
4642 : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
4643 : MySubscription->name));
4644 : else
4645 8 : ereport(LOG,
4646 : errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
4647 : MySubscription->name));
4648 :
4649 8 : apply_worker_exit();
4650 : }
4651 :
4652 : /* Check for other changes that should never happen too. */
4653 82 : if (newsub->dbid != MySubscription->dbid)
4654 : {
4655 0 : elog(ERROR, "subscription %u changed unexpectedly",
4656 : MyLogicalRepWorker->subid);
4657 : }
4658 :
4659 : /* Clean old subscription info and switch to new one. */
4660 82 : FreeSubscription(MySubscription);
4661 82 : MySubscription = newsub;
4662 :
4663 82 : MemoryContextSwitchTo(oldctx);
4664 :
4665 : /* Change synchronous commit according to the user's wishes */
4666 82 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
4667 : PGC_BACKEND, PGC_S_OVERRIDE);
4668 :
4669 82 : if (started_tx)
4670 78 : CommitTransactionCommand();
4671 :
4672 82 : MySubscriptionValid = true;
4673 : }
4674 :
4675 : /*
4676 : * Callback from subscription syscache invalidation.
4677 : */
4678 : static void
4679 170 : subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
4680 : {
4681 170 : MySubscriptionValid = false;
4682 170 : }
4683 :
4684 : /*
4685 : * subxact_info_write
4686 : * Store information about subxacts for a toplevel transaction.
4687 : *
4688 : * For each subxact we store offset of its first change in the main file.
4689 : * The file is always over-written as a whole.
4690 : *
4691 : * XXX We should only store subxacts that were not aborted yet.
4692 : */
4693 : static void
4694 742 : subxact_info_write(Oid subid, TransactionId xid)
4695 : {
4696 : char path[MAXPGPATH];
4697 : Size len;
4698 : BufFile *fd;
4699 :
4700 : Assert(TransactionIdIsValid(xid));
4701 :
4702 : /* construct the subxact filename */
4703 742 : subxact_filename(path, subid, xid);
4704 :
4705 : /* Delete the subxacts file, if exists. */
4706 742 : if (subxact_data.nsubxacts == 0)
4707 : {
4708 578 : cleanup_subxact_info();
4709 578 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
4710 :
4711 578 : return;
4712 : }
4713 :
4714 : /*
4715 : * Create the subxact file if it not already created, otherwise open the
4716 : * existing file.
4717 : */
4718 164 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR,
4719 : true);
4720 164 : if (fd == NULL)
4721 16 : fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path);
4722 :
4723 164 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
4724 :
4725 : /* Write the subxact count and subxact info */
4726 164 : BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
4727 164 : BufFileWrite(fd, subxact_data.subxacts, len);
4728 :
4729 164 : BufFileClose(fd);
4730 :
4731 : /* free the memory allocated for subxact info */
4732 164 : cleanup_subxact_info();
4733 : }
4734 :
4735 : /*
4736 : * subxact_info_read
4737 : * Restore information about subxacts of a streamed transaction.
4738 : *
4739 : * Read information about subxacts into the structure subxact_data that can be
4740 : * used later.
4741 : */
4742 : static void
4743 686 : subxact_info_read(Oid subid, TransactionId xid)
4744 : {
4745 : char path[MAXPGPATH];
4746 : Size len;
4747 : BufFile *fd;
4748 : MemoryContext oldctx;
4749 :
4750 : Assert(!subxact_data.subxacts);
4751 : Assert(subxact_data.nsubxacts == 0);
4752 : Assert(subxact_data.nsubxacts_max == 0);
4753 :
4754 : /*
4755 : * If the subxact file doesn't exist that means we don't have any subxact
4756 : * info.
4757 : */
4758 686 : subxact_filename(path, subid, xid);
4759 686 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
4760 : true);
4761 686 : if (fd == NULL)
4762 528 : return;
4763 :
4764 : /* read number of subxact items */
4765 158 : BufFileReadExact(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
4766 :
4767 158 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
4768 :
4769 : /* we keep the maximum as a power of 2 */
4770 158 : subxact_data.nsubxacts_max = 1 << my_log2(subxact_data.nsubxacts);
4771 :
4772 : /*
4773 : * Allocate subxact information in the logical streaming context. We need
4774 : * this information during the complete stream so that we can add the sub
4775 : * transaction info to this. On stream stop we will flush this information
4776 : * to the subxact file and reset the logical streaming context.
4777 : */
4778 158 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
4779 158 : subxact_data.subxacts = palloc(subxact_data.nsubxacts_max *
4780 : sizeof(SubXactInfo));
4781 158 : MemoryContextSwitchTo(oldctx);
4782 :
4783 158 : if (len > 0)
4784 158 : BufFileReadExact(fd, subxact_data.subxacts, len);
4785 :
4786 158 : BufFileClose(fd);
4787 : }
4788 :
4789 : /*
4790 : * subxact_info_add
4791 : * Add information about a subxact (offset in the main file).
4792 : */
4793 : static void
4794 205024 : subxact_info_add(TransactionId xid)
4795 : {
4796 205024 : SubXactInfo *subxacts = subxact_data.subxacts;
4797 : int64 i;
4798 :
4799 : /* We must have a valid top level stream xid and a stream fd. */
4800 : Assert(TransactionIdIsValid(stream_xid));
4801 : Assert(stream_fd != NULL);
4802 :
4803 : /*
4804 : * If the XID matches the toplevel transaction, we don't want to add it.
4805 : */
4806 205024 : if (stream_xid == xid)
4807 184776 : return;
4808 :
4809 : /*
4810 : * In most cases we're checking the same subxact as we've already seen in
4811 : * the last call, so make sure to ignore it (this change comes later).
4812 : */
4813 20248 : if (subxact_data.subxact_last == xid)
4814 20096 : return;
4815 :
4816 : /* OK, remember we're processing this XID. */
4817 152 : subxact_data.subxact_last = xid;
4818 :
4819 : /*
4820 : * Check if the transaction is already present in the array of subxact. We
4821 : * intentionally scan the array from the tail, because we're likely adding
4822 : * a change for the most recent subtransactions.
4823 : *
4824 : * XXX Can we rely on the subxact XIDs arriving in sorted order? That
4825 : * would allow us to use binary search here.
4826 : */
4827 190 : for (i = subxact_data.nsubxacts; i > 0; i--)
4828 : {
4829 : /* found, so we're done */
4830 152 : if (subxacts[i - 1].xid == xid)
4831 114 : return;
4832 : }
4833 :
4834 : /* This is a new subxact, so we need to add it to the array. */
4835 38 : if (subxact_data.nsubxacts == 0)
4836 : {
4837 : MemoryContext oldctx;
4838 :
4839 16 : subxact_data.nsubxacts_max = 128;
4840 :
4841 : /*
4842 : * Allocate this memory for subxacts in per-stream context, see
4843 : * subxact_info_read.
4844 : */
4845 16 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
4846 16 : subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
4847 16 : MemoryContextSwitchTo(oldctx);
4848 : }
4849 22 : else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
4850 : {
4851 20 : subxact_data.nsubxacts_max *= 2;
4852 20 : subxacts = repalloc(subxacts,
4853 20 : subxact_data.nsubxacts_max * sizeof(SubXactInfo));
4854 : }
4855 :
4856 38 : subxacts[subxact_data.nsubxacts].xid = xid;
4857 :
4858 : /*
4859 : * Get the current offset of the stream file and store it as offset of
4860 : * this subxact.
4861 : */
4862 38 : BufFileTell(stream_fd,
4863 38 : &subxacts[subxact_data.nsubxacts].fileno,
4864 38 : &subxacts[subxact_data.nsubxacts].offset);
4865 :
4866 38 : subxact_data.nsubxacts++;
4867 38 : subxact_data.subxacts = subxacts;
4868 : }
4869 :
4870 : /* format filename for file containing the info about subxacts */
4871 : static inline void
4872 1490 : subxact_filename(char *path, Oid subid, TransactionId xid)
4873 : {
4874 1490 : snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
4875 1490 : }
4876 :
4877 : /* format filename for file containing serialized changes */
4878 : static inline void
4879 874 : changes_filename(char *path, Oid subid, TransactionId xid)
4880 : {
4881 874 : snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
4882 874 : }
4883 :
4884 : /*
4885 : * stream_cleanup_files
4886 : * Cleanup files for a subscription / toplevel transaction.
4887 : *
4888 : * Remove files with serialized changes and subxact info for a particular
4889 : * toplevel transaction. Each subscription has a separate set of files
4890 : * for any toplevel transaction.
4891 : */
4892 : void
4893 62 : stream_cleanup_files(Oid subid, TransactionId xid)
4894 : {
4895 : char path[MAXPGPATH];
4896 :
4897 : /* Delete the changes file. */
4898 62 : changes_filename(path, subid, xid);
4899 62 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false);
4900 :
4901 : /* Delete the subxact file, if it exists. */
4902 62 : subxact_filename(path, subid, xid);
4903 62 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
4904 62 : }
4905 :
4906 : /*
4907 : * stream_open_file
4908 : * Open a file that we'll use to serialize changes for a toplevel
4909 : * transaction.
4910 : *
4911 : * Open a file for streamed changes from a toplevel transaction identified
4912 : * by stream_xid (global variable). If it's the first chunk of streamed
4913 : * changes for this transaction, create the buffile, otherwise open the
4914 : * previously created file.
4915 : */
4916 : static void
4917 724 : stream_open_file(Oid subid, TransactionId xid, bool first_segment)
4918 : {
4919 : char path[MAXPGPATH];
4920 : MemoryContext oldcxt;
4921 :
4922 : Assert(OidIsValid(subid));
4923 : Assert(TransactionIdIsValid(xid));
4924 : Assert(stream_fd == NULL);
4925 :
4926 :
4927 724 : changes_filename(path, subid, xid);
4928 724 : elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
4929 :
4930 : /*
4931 : * Create/open the buffiles under the logical streaming context so that we
4932 : * have those files until stream stop.
4933 : */
4934 724 : oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
4935 :
4936 : /*
4937 : * If this is the first streamed segment, create the changes file.
4938 : * Otherwise, just open the file for writing, in append mode.
4939 : */
4940 724 : if (first_segment)
4941 64 : stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset,
4942 : path);
4943 : else
4944 : {
4945 : /*
4946 : * Open the file and seek to the end of the file because we always
4947 : * append the changes file.
4948 : */
4949 660 : stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset,
4950 : path, O_RDWR, false);
4951 660 : BufFileSeek(stream_fd, 0, 0, SEEK_END);
4952 : }
4953 :
4954 724 : MemoryContextSwitchTo(oldcxt);
4955 724 : }
4956 :
4957 : /*
4958 : * stream_close_file
4959 : * Close the currently open file with streamed changes.
4960 : */
4961 : static void
4962 784 : stream_close_file(void)
4963 : {
4964 : Assert(stream_fd != NULL);
4965 :
4966 784 : BufFileClose(stream_fd);
4967 :
4968 784 : stream_fd = NULL;
4969 784 : }
4970 :
4971 : /*
4972 : * stream_write_change
4973 : * Serialize a change to a file for the current toplevel transaction.
4974 : *
4975 : * The change is serialized in a simple format, with length (not including
4976 : * the length), action code (identifying the message type) and message
4977 : * contents (without the subxact TransactionId value).
4978 : */
4979 : static void
4980 215106 : stream_write_change(char action, StringInfo s)
4981 : {
4982 : int len;
4983 :
4984 : Assert(stream_fd != NULL);
4985 :
4986 : /* total on-disk size, including the action type character */
4987 215106 : len = (s->len - s->cursor) + sizeof(char);
4988 :
4989 : /* first write the size */
4990 215106 : BufFileWrite(stream_fd, &len, sizeof(len));
4991 :
4992 : /* then the action */
4993 215106 : BufFileWrite(stream_fd, &action, sizeof(action));
4994 :
4995 : /* and finally the remaining part of the buffer (after the XID) */
4996 215106 : len = (s->len - s->cursor);
4997 :
4998 215106 : BufFileWrite(stream_fd, &s->data[s->cursor], len);
4999 215106 : }
5000 :
5001 : /*
5002 : * stream_open_and_write_change
5003 : * Serialize a message to a file for the given transaction.
5004 : *
5005 : * This function is similar to stream_write_change except that it will open the
5006 : * target file if not already before writing the message and close the file at
5007 : * the end.
5008 : */
5009 : static void
5010 10 : stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
5011 : {
5012 : Assert(!in_streamed_transaction);
5013 :
5014 10 : if (!stream_fd)
5015 10 : stream_start_internal(xid, false);
5016 :
5017 10 : stream_write_change(action, s);
5018 10 : stream_stop_internal(xid);
5019 10 : }
5020 :
5021 : /*
5022 : * Sets streaming options including replication slot name and origin start
5023 : * position. Workers need these options for logical replication.
5024 : */
5025 : void
5026 770 : set_stream_options(WalRcvStreamOptions *options,
5027 : char *slotname,
5028 : XLogRecPtr *origin_startpos)
5029 : {
5030 : int server_version;
5031 :
5032 770 : options->logical = true;
5033 770 : options->startpoint = *origin_startpos;
5034 770 : options->slotname = slotname;
5035 :
5036 770 : server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
5037 770 : options->proto.logical.proto_version =
5038 770 : server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
5039 : server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
5040 : server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
5041 : LOGICALREP_PROTO_VERSION_NUM;
5042 :
5043 770 : options->proto.logical.publication_names = MySubscription->publications;
5044 770 : options->proto.logical.binary = MySubscription->binary;
5045 :
5046 : /*
5047 : * Assign the appropriate option value for streaming option according to
5048 : * the 'streaming' mode and the publisher's ability to support that mode.
5049 : */
5050 770 : if (server_version >= 160000 &&
5051 770 : MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
5052 : {
5053 702 : options->proto.logical.streaming_str = "parallel";
5054 702 : MyLogicalRepWorker->parallel_apply = true;
5055 : }
5056 68 : else if (server_version >= 140000 &&
5057 68 : MySubscription->stream != LOGICALREP_STREAM_OFF)
5058 : {
5059 52 : options->proto.logical.streaming_str = "on";
5060 52 : MyLogicalRepWorker->parallel_apply = false;
5061 : }
5062 : else
5063 : {
5064 16 : options->proto.logical.streaming_str = NULL;
5065 16 : MyLogicalRepWorker->parallel_apply = false;
5066 : }
5067 :
5068 770 : options->proto.logical.twophase = false;
5069 770 : options->proto.logical.origin = pstrdup(MySubscription->origin);
5070 770 : }
5071 :
5072 : /*
5073 : * Cleanup the memory for subxacts and reset the related variables.
5074 : */
5075 : static inline void
5076 750 : cleanup_subxact_info()
5077 : {
5078 750 : if (subxact_data.subxacts)
5079 174 : pfree(subxact_data.subxacts);
5080 :
5081 750 : subxact_data.subxacts = NULL;
5082 750 : subxact_data.subxact_last = InvalidTransactionId;
5083 750 : subxact_data.nsubxacts = 0;
5084 750 : subxact_data.nsubxacts_max = 0;
5085 750 : }
5086 :
5087 : /*
5088 : * Common function to run the apply loop with error handling. Disable the
5089 : * subscription, if necessary.
5090 : *
5091 : * Note that we don't handle FATAL errors which are probably because
5092 : * of system resource error and are not repeatable.
5093 : */
5094 : void
5095 770 : start_apply(XLogRecPtr origin_startpos)
5096 : {
5097 770 : PG_TRY();
5098 : {
5099 770 : LogicalRepApplyLoop(origin_startpos);
5100 : }
5101 122 : PG_CATCH();
5102 : {
5103 : /*
5104 : * Reset the origin state to prevent the advancement of origin
5105 : * progress if we fail to apply. Otherwise, this will result in
5106 : * transaction loss as that transaction won't be sent again by the
5107 : * server.
5108 : */
5109 122 : replorigin_reset(0, (Datum) 0);
5110 :
5111 122 : if (MySubscription->disableonerr)
5112 6 : DisableSubscriptionAndExit();
5113 : else
5114 : {
5115 : /*
5116 : * Report the worker failed while applying changes. Abort the
5117 : * current transaction so that the stats message is sent in an
5118 : * idle state.
5119 : */
5120 116 : AbortOutOfAnyTransaction();
5121 116 : pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
5122 :
5123 116 : PG_RE_THROW();
5124 : }
5125 : }
5126 0 : PG_END_TRY();
5127 0 : }
5128 :
5129 : /*
5130 : * Runs the leader apply worker.
5131 : *
5132 : * It sets up replication origin, streaming options and then starts streaming.
5133 : */
5134 : static void
5135 472 : run_apply_worker()
5136 : {
5137 : char originname[NAMEDATALEN];
5138 472 : XLogRecPtr origin_startpos = InvalidXLogRecPtr;
5139 472 : char *slotname = NULL;
5140 : WalRcvStreamOptions options;
5141 : RepOriginId originid;
5142 : TimeLineID startpointTLI;
5143 : char *err;
5144 : bool must_use_password;
5145 :
5146 472 : slotname = MySubscription->slotname;
5147 :
5148 : /*
5149 : * This shouldn't happen if the subscription is enabled, but guard against
5150 : * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
5151 : * slot is NULL.)
5152 : */
5153 472 : if (!slotname)
5154 0 : ereport(ERROR,
5155 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5156 : errmsg("subscription has no replication slot set")));
5157 :
5158 : /* Setup replication origin tracking. */
5159 472 : ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
5160 : originname, sizeof(originname));
5161 472 : StartTransactionCommand();
5162 472 : originid = replorigin_by_name(originname, true);
5163 472 : if (!OidIsValid(originid))
5164 0 : originid = replorigin_create(originname);
5165 472 : replorigin_session_setup(originid, 0);
5166 472 : replorigin_session_origin = originid;
5167 472 : origin_startpos = replorigin_session_get_progress(false);
5168 472 : CommitTransactionCommand();
5169 :
5170 : /* Is the use of a password mandatory? */
5171 906 : must_use_password = MySubscription->passwordrequired &&
5172 434 : !MySubscription->ownersuperuser;
5173 :
5174 472 : LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
5175 : true, must_use_password,
5176 : MySubscription->name, &err);
5177 :
5178 452 : if (LogRepWorkerWalRcvConn == NULL)
5179 48 : ereport(ERROR,
5180 : (errcode(ERRCODE_CONNECTION_FAILURE),
5181 : errmsg("apply worker for subscription \"%s\" could not connect to the publisher: %s",
5182 : MySubscription->name, err)));
5183 :
5184 : /*
5185 : * We don't really use the output identify_system for anything but it does
5186 : * some initializations on the upstream so let's still call it.
5187 : */
5188 404 : (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
5189 :
5190 404 : set_apply_error_context_origin(originname);
5191 :
5192 404 : set_stream_options(&options, slotname, &origin_startpos);
5193 :
5194 : /*
5195 : * Even when the two_phase mode is requested by the user, it remains as
5196 : * the tri-state PENDING until all tablesyncs have reached READY state.
5197 : * Only then, can it become ENABLED.
5198 : *
5199 : * Note: If the subscription has no tables then leave the state as
5200 : * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
5201 : * work.
5202 : */
5203 436 : if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
5204 32 : AllTablesyncsReady())
5205 : {
5206 : /* Start streaming with two_phase enabled */
5207 18 : options.proto.logical.twophase = true;
5208 18 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
5209 :
5210 18 : StartTransactionCommand();
5211 :
5212 : /*
5213 : * Updating pg_subscription might involve TOAST table access, so
5214 : * ensure we have a valid snapshot.
5215 : */
5216 18 : PushActiveSnapshot(GetTransactionSnapshot());
5217 :
5218 18 : UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
5219 18 : MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
5220 18 : PopActiveSnapshot();
5221 18 : CommitTransactionCommand();
5222 : }
5223 : else
5224 : {
5225 386 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
5226 : }
5227 :
5228 404 : ereport(DEBUG1,
5229 : (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
5230 : MySubscription->name,
5231 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
5232 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
5233 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
5234 : "?")));
5235 :
5236 : /* Run the main loop. */
5237 404 : start_apply(origin_startpos);
5238 0 : }
5239 :
5240 : /*
5241 : * Common initialization for leader apply worker, parallel apply worker and
5242 : * tablesync worker.
5243 : *
5244 : * Initialize the database connection, in-memory subscription and necessary
5245 : * config options.
5246 : */
5247 : void
5248 890 : InitializeLogRepWorker(void)
5249 : {
5250 : MemoryContext oldctx;
5251 :
5252 : /* Run as replica session replication role. */
5253 890 : SetConfigOption("session_replication_role", "replica",
5254 : PGC_SUSET, PGC_S_OVERRIDE);
5255 :
5256 : /* Connect to our database. */
5257 890 : BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
5258 890 : MyLogicalRepWorker->userid,
5259 : 0);
5260 :
5261 : /*
5262 : * Set always-secure search path, so malicious users can't redirect user
5263 : * code (e.g. pg_index.indexprs).
5264 : */
5265 884 : SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
5266 :
5267 : /* Load the subscription into persistent memory context. */
5268 884 : ApplyContext = AllocSetContextCreate(TopMemoryContext,
5269 : "ApplyContext",
5270 : ALLOCSET_DEFAULT_SIZES);
5271 884 : StartTransactionCommand();
5272 884 : oldctx = MemoryContextSwitchTo(ApplyContext);
5273 :
5274 884 : MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
5275 880 : if (!MySubscription)
5276 : {
5277 0 : ereport(LOG,
5278 : (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
5279 : MyLogicalRepWorker->subid)));
5280 :
5281 : /* Ensure we remove no-longer-useful entry for worker's start time */
5282 0 : if (am_leader_apply_worker())
5283 0 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
5284 :
5285 0 : proc_exit(0);
5286 : }
5287 :
5288 880 : MySubscriptionValid = true;
5289 880 : MemoryContextSwitchTo(oldctx);
5290 :
5291 880 : if (!MySubscription->enabled)
5292 : {
5293 0 : ereport(LOG,
5294 : (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
5295 : MySubscription->name)));
5296 :
5297 0 : apply_worker_exit();
5298 : }
5299 :
5300 : /*
5301 : * Restart the worker if retain_dead_tuples was enabled during startup.
5302 : *
5303 : * At this point, the replication slot used for conflict detection might
5304 : * not exist yet, or could be dropped soon if the launcher perceives
5305 : * retain_dead_tuples as disabled. To avoid unnecessary tracking of
5306 : * oldest_nonremovable_xid when the slot is absent or at risk of being
5307 : * dropped, a restart is initiated.
5308 : *
5309 : * The oldest_nonremovable_xid should be initialized only when the
5310 : * retain_dead_tuples is enabled before launching the worker. See
5311 : * logicalrep_worker_launch.
5312 : */
5313 880 : if (am_leader_apply_worker() &&
5314 472 : MySubscription->retaindeadtuples &&
5315 16 : !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid))
5316 : {
5317 0 : ereport(LOG,
5318 : errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup",
5319 : MySubscription->name, "retain_dead_tuples"));
5320 :
5321 0 : apply_worker_exit();
5322 : }
5323 :
5324 : /* Setup synchronous commit according to the user's wishes */
5325 880 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
5326 : PGC_BACKEND, PGC_S_OVERRIDE);
5327 :
5328 : /*
5329 : * Keep us informed about subscription or role changes. Note that the
5330 : * role's superuser privilege can be revoked.
5331 : */
5332 880 : CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
5333 : subscription_change_cb,
5334 : (Datum) 0);
5335 :
5336 880 : CacheRegisterSyscacheCallback(AUTHOID,
5337 : subscription_change_cb,
5338 : (Datum) 0);
5339 :
5340 880 : if (am_tablesync_worker())
5341 388 : ereport(LOG,
5342 : (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
5343 : MySubscription->name,
5344 : get_rel_name(MyLogicalRepWorker->relid))));
5345 : else
5346 492 : ereport(LOG,
5347 : (errmsg("logical replication apply worker for subscription \"%s\" has started",
5348 : MySubscription->name)));
5349 :
5350 880 : CommitTransactionCommand();
5351 880 : }
5352 :
5353 : /*
5354 : * Reset the origin state.
5355 : */
5356 : static void
5357 982 : replorigin_reset(int code, Datum arg)
5358 : {
5359 982 : replorigin_session_origin = InvalidRepOriginId;
5360 982 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
5361 982 : replorigin_session_origin_timestamp = 0;
5362 982 : }
5363 :
5364 : /* Common function to setup the leader apply or tablesync worker. */
5365 : void
5366 870 : SetupApplyOrSyncWorker(int worker_slot)
5367 : {
5368 : /* Attach to slot */
5369 870 : logicalrep_worker_attach(worker_slot);
5370 :
5371 : Assert(am_tablesync_worker() || am_leader_apply_worker());
5372 :
5373 : /* Setup signal handling */
5374 870 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
5375 870 : pqsignal(SIGTERM, die);
5376 870 : BackgroundWorkerUnblockSignals();
5377 :
5378 : /*
5379 : * We don't currently need any ResourceOwner in a walreceiver process, but
5380 : * if we did, we could call CreateAuxProcessResourceOwner here.
5381 : */
5382 :
5383 : /* Initialise stats to a sanish value */
5384 870 : MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
5385 870 : MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
5386 :
5387 : /* Load the libpq-specific functions */
5388 870 : load_file("libpqwalreceiver", false);
5389 :
5390 870 : InitializeLogRepWorker();
5391 :
5392 : /*
5393 : * Register a callback to reset the origin state before aborting any
5394 : * pending transaction during shutdown (see ShutdownPostgres()). This will
5395 : * avoid origin advancement for an in-complete transaction which could
5396 : * otherwise lead to its loss as such a transaction won't be sent by the
5397 : * server again.
5398 : *
5399 : * Note that even a LOG or DEBUG statement placed after setting the origin
5400 : * state may process a shutdown signal before committing the current apply
5401 : * operation. So, it is important to register such a callback here.
5402 : */
5403 860 : before_shmem_exit(replorigin_reset, (Datum) 0);
5404 :
5405 : /* Connect to the origin and start the replication. */
5406 860 : elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
5407 : MySubscription->conninfo);
5408 :
5409 : /*
5410 : * Setup callback for syscache so that we know when something changes in
5411 : * the subscription relation state.
5412 : */
5413 860 : CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
5414 : invalidate_syncing_table_states,
5415 : (Datum) 0);
5416 860 : }
5417 :
5418 : /* Logical Replication Apply worker entry point */
5419 : void
5420 478 : ApplyWorkerMain(Datum main_arg)
5421 : {
5422 478 : int worker_slot = DatumGetInt32(main_arg);
5423 :
5424 478 : InitializingApplyWorker = true;
5425 :
5426 478 : SetupApplyOrSyncWorker(worker_slot);
5427 :
5428 472 : InitializingApplyWorker = false;
5429 :
5430 472 : run_apply_worker();
5431 :
5432 0 : proc_exit(0);
5433 : }
5434 :
5435 : /*
5436 : * After error recovery, disable the subscription in a new transaction
5437 : * and exit cleanly.
5438 : */
5439 : void
5440 8 : DisableSubscriptionAndExit(void)
5441 : {
5442 : /*
5443 : * Emit the error message, and recover from the error state to an idle
5444 : * state
5445 : */
5446 8 : HOLD_INTERRUPTS();
5447 :
5448 8 : EmitErrorReport();
5449 8 : AbortOutOfAnyTransaction();
5450 8 : FlushErrorState();
5451 :
5452 8 : RESUME_INTERRUPTS();
5453 :
5454 : /* Report the worker failed during either table synchronization or apply */
5455 8 : pgstat_report_subscription_error(MyLogicalRepWorker->subid,
5456 8 : !am_tablesync_worker());
5457 :
5458 : /* Disable the subscription */
5459 8 : StartTransactionCommand();
5460 :
5461 : /*
5462 : * Updating pg_subscription might involve TOAST table access, so ensure we
5463 : * have a valid snapshot.
5464 : */
5465 8 : PushActiveSnapshot(GetTransactionSnapshot());
5466 :
5467 8 : DisableSubscription(MySubscription->oid);
5468 8 : PopActiveSnapshot();
5469 8 : CommitTransactionCommand();
5470 :
5471 : /* Ensure we remove no-longer-useful entry for worker's start time */
5472 8 : if (am_leader_apply_worker())
5473 6 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
5474 :
5475 : /* Notify the subscription has been disabled and exit */
5476 8 : ereport(LOG,
5477 : errmsg("subscription \"%s\" has been disabled because of an error",
5478 : MySubscription->name));
5479 :
5480 : /*
5481 : * Skip the track_commit_timestamp check when disabling the worker due to
5482 : * an error, as verifying commit timestamps is unnecessary in this
5483 : * context.
5484 : */
5485 8 : if (MySubscription->retaindeadtuples)
5486 0 : CheckSubDeadTupleRetention(false, true, WARNING);
5487 :
5488 8 : proc_exit(0);
5489 : }
5490 :
5491 : /*
5492 : * Is current process a logical replication worker?
5493 : */
5494 : bool
5495 4006 : IsLogicalWorker(void)
5496 : {
5497 4006 : return MyLogicalRepWorker != NULL;
5498 : }
5499 :
5500 : /*
5501 : * Is current process a logical replication parallel apply worker?
5502 : */
5503 : bool
5504 2766 : IsLogicalParallelApplyWorker(void)
5505 : {
5506 2766 : return IsLogicalWorker() && am_parallel_apply_worker();
5507 : }
5508 :
5509 : /*
5510 : * Start skipping changes of the transaction if the given LSN matches the
5511 : * LSN specified by subscription's skiplsn.
5512 : */
5513 : static void
5514 1010 : maybe_start_skipping_changes(XLogRecPtr finish_lsn)
5515 : {
5516 : Assert(!is_skipping_changes());
5517 : Assert(!in_remote_transaction);
5518 : Assert(!in_streamed_transaction);
5519 :
5520 : /*
5521 : * Quick return if it's not requested to skip this transaction. This
5522 : * function is called for every remote transaction and we assume that
5523 : * skipping the transaction is not used often.
5524 : */
5525 1010 : if (likely(XLogRecPtrIsInvalid(MySubscription->skiplsn) ||
5526 : MySubscription->skiplsn != finish_lsn))
5527 1004 : return;
5528 :
5529 : /* Start skipping all changes of this transaction */
5530 6 : skip_xact_finish_lsn = finish_lsn;
5531 :
5532 6 : ereport(LOG,
5533 : errmsg("logical replication starts skipping transaction at LSN %X/%08X",
5534 : LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
5535 : }
5536 :
5537 : /*
5538 : * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
5539 : */
5540 : static void
5541 54 : stop_skipping_changes(void)
5542 : {
5543 54 : if (!is_skipping_changes())
5544 48 : return;
5545 :
5546 6 : ereport(LOG,
5547 : errmsg("logical replication completed skipping transaction at LSN %X/%08X",
5548 : LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
5549 :
5550 : /* Stop skipping changes */
5551 6 : skip_xact_finish_lsn = InvalidXLogRecPtr;
5552 : }
5553 :
5554 : /*
5555 : * Clear subskiplsn of pg_subscription catalog.
5556 : *
5557 : * finish_lsn is the transaction's finish LSN that is used to check if the
5558 : * subskiplsn matches it. If not matched, we raise a warning when clearing the
5559 : * subskiplsn in order to inform users for cases e.g., where the user mistakenly
5560 : * specified the wrong subskiplsn.
5561 : */
5562 : static void
5563 1038 : clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
5564 : {
5565 : Relation rel;
5566 : Form_pg_subscription subform;
5567 : HeapTuple tup;
5568 1038 : XLogRecPtr myskiplsn = MySubscription->skiplsn;
5569 1038 : bool started_tx = false;
5570 :
5571 1038 : if (likely(XLogRecPtrIsInvalid(myskiplsn)) || am_parallel_apply_worker())
5572 1032 : return;
5573 :
5574 6 : if (!IsTransactionState())
5575 : {
5576 2 : StartTransactionCommand();
5577 2 : started_tx = true;
5578 : }
5579 :
5580 : /*
5581 : * Updating pg_subscription might involve TOAST table access, so ensure we
5582 : * have a valid snapshot.
5583 : */
5584 6 : PushActiveSnapshot(GetTransactionSnapshot());
5585 :
5586 : /*
5587 : * Protect subskiplsn of pg_subscription from being concurrently updated
5588 : * while clearing it.
5589 : */
5590 6 : LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
5591 : AccessShareLock);
5592 :
5593 6 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
5594 :
5595 : /* Fetch the existing tuple. */
5596 6 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
5597 : ObjectIdGetDatum(MySubscription->oid));
5598 :
5599 6 : if (!HeapTupleIsValid(tup))
5600 0 : elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
5601 :
5602 6 : subform = (Form_pg_subscription) GETSTRUCT(tup);
5603 :
5604 : /*
5605 : * Clear the subskiplsn. If the user has already changed subskiplsn before
5606 : * clearing it we don't update the catalog and the replication origin
5607 : * state won't get advanced. So in the worst case, if the server crashes
5608 : * before sending an acknowledgment of the flush position the transaction
5609 : * will be sent again and the user needs to set subskiplsn again. We can
5610 : * reduce the possibility by logging a replication origin WAL record to
5611 : * advance the origin LSN instead but there is no way to advance the
5612 : * origin timestamp and it doesn't seem to be worth doing anything about
5613 : * it since it's a very rare case.
5614 : */
5615 6 : if (subform->subskiplsn == myskiplsn)
5616 : {
5617 : bool nulls[Natts_pg_subscription];
5618 : bool replaces[Natts_pg_subscription];
5619 : Datum values[Natts_pg_subscription];
5620 :
5621 6 : memset(values, 0, sizeof(values));
5622 6 : memset(nulls, false, sizeof(nulls));
5623 6 : memset(replaces, false, sizeof(replaces));
5624 :
5625 : /* reset subskiplsn */
5626 6 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
5627 6 : replaces[Anum_pg_subscription_subskiplsn - 1] = true;
5628 :
5629 6 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
5630 : replaces);
5631 6 : CatalogTupleUpdate(rel, &tup->t_self, tup);
5632 :
5633 6 : if (myskiplsn != finish_lsn)
5634 0 : ereport(WARNING,
5635 : errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
5636 : errdetail("Remote transaction's finish WAL location (LSN) %X/%08X did not match skip-LSN %X/%08X.",
5637 : LSN_FORMAT_ARGS(finish_lsn),
5638 : LSN_FORMAT_ARGS(myskiplsn)));
5639 : }
5640 :
5641 6 : heap_freetuple(tup);
5642 6 : table_close(rel, NoLock);
5643 :
5644 6 : PopActiveSnapshot();
5645 :
5646 6 : if (started_tx)
5647 2 : CommitTransactionCommand();
5648 : }
5649 :
5650 : /* Error callback to give more context info about the change being applied */
5651 : void
5652 1502 : apply_error_callback(void *arg)
5653 : {
5654 1502 : ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
5655 :
5656 1502 : if (apply_error_callback_arg.command == 0)
5657 776 : return;
5658 :
5659 : Assert(errarg->origin_name);
5660 :
5661 726 : if (errarg->rel == NULL)
5662 : {
5663 624 : if (!TransactionIdIsValid(errarg->remote_xid))
5664 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
5665 : errarg->origin_name,
5666 : logicalrep_message_type(errarg->command));
5667 624 : else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
5668 516 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
5669 : errarg->origin_name,
5670 : logicalrep_message_type(errarg->command),
5671 : errarg->remote_xid);
5672 : else
5673 216 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%08X",
5674 : errarg->origin_name,
5675 : logicalrep_message_type(errarg->command),
5676 : errarg->remote_xid,
5677 108 : LSN_FORMAT_ARGS(errarg->finish_lsn));
5678 : }
5679 : else
5680 : {
5681 102 : if (errarg->remote_attnum < 0)
5682 : {
5683 102 : if (XLogRecPtrIsInvalid(errarg->finish_lsn))
5684 4 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
5685 : errarg->origin_name,
5686 : logicalrep_message_type(errarg->command),
5687 2 : errarg->rel->remoterel.nspname,
5688 2 : errarg->rel->remoterel.relname,
5689 : errarg->remote_xid);
5690 : else
5691 200 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%08X",
5692 : errarg->origin_name,
5693 : logicalrep_message_type(errarg->command),
5694 100 : errarg->rel->remoterel.nspname,
5695 100 : errarg->rel->remoterel.relname,
5696 : errarg->remote_xid,
5697 100 : LSN_FORMAT_ARGS(errarg->finish_lsn));
5698 : }
5699 : else
5700 : {
5701 0 : if (XLogRecPtrIsInvalid(errarg->finish_lsn))
5702 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
5703 : errarg->origin_name,
5704 : logicalrep_message_type(errarg->command),
5705 0 : errarg->rel->remoterel.nspname,
5706 0 : errarg->rel->remoterel.relname,
5707 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
5708 : errarg->remote_xid);
5709 : else
5710 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%08X",
5711 : errarg->origin_name,
5712 : logicalrep_message_type(errarg->command),
5713 0 : errarg->rel->remoterel.nspname,
5714 0 : errarg->rel->remoterel.relname,
5715 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
5716 : errarg->remote_xid,
5717 0 : LSN_FORMAT_ARGS(errarg->finish_lsn));
5718 : }
5719 : }
5720 : }
5721 :
5722 : /* Set transaction information of apply error callback */
5723 : static inline void
5724 5724 : set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
5725 : {
5726 5724 : apply_error_callback_arg.remote_xid = xid;
5727 5724 : apply_error_callback_arg.finish_lsn = lsn;
5728 5724 : }
5729 :
5730 : /* Reset all information of apply error callback */
5731 : static inline void
5732 2822 : reset_apply_error_context_info(void)
5733 : {
5734 2822 : apply_error_callback_arg.command = 0;
5735 2822 : apply_error_callback_arg.rel = NULL;
5736 2822 : apply_error_callback_arg.remote_attnum = -1;
5737 2822 : set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
5738 2822 : }
5739 :
5740 : /*
5741 : * Request wakeup of the workers for the given subscription OID
5742 : * at commit of the current transaction.
5743 : *
5744 : * This is used to ensure that the workers process assorted changes
5745 : * as soon as possible.
5746 : */
5747 : void
5748 406 : LogicalRepWorkersWakeupAtCommit(Oid subid)
5749 : {
5750 : MemoryContext oldcxt;
5751 :
5752 406 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
5753 406 : on_commit_wakeup_workers_subids =
5754 406 : list_append_unique_oid(on_commit_wakeup_workers_subids, subid);
5755 406 : MemoryContextSwitchTo(oldcxt);
5756 406 : }
5757 :
5758 : /*
5759 : * Wake up the workers of any subscriptions that were changed in this xact.
5760 : */
5761 : void
5762 1061494 : AtEOXact_LogicalRepWorkers(bool isCommit)
5763 : {
5764 1061494 : if (isCommit && on_commit_wakeup_workers_subids != NIL)
5765 : {
5766 : ListCell *lc;
5767 :
5768 396 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
5769 792 : foreach(lc, on_commit_wakeup_workers_subids)
5770 : {
5771 396 : Oid subid = lfirst_oid(lc);
5772 : List *workers;
5773 : ListCell *lc2;
5774 :
5775 396 : workers = logicalrep_workers_find(subid, true, false);
5776 514 : foreach(lc2, workers)
5777 : {
5778 118 : LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
5779 :
5780 118 : logicalrep_worker_wakeup_ptr(worker);
5781 : }
5782 : }
5783 396 : LWLockRelease(LogicalRepWorkerLock);
5784 : }
5785 :
5786 : /* The List storage will be reclaimed automatically in xact cleanup. */
5787 1061494 : on_commit_wakeup_workers_subids = NIL;
5788 1061494 : }
5789 :
5790 : /*
5791 : * Allocate the origin name in long-lived context for error context message.
5792 : */
5793 : void
5794 790 : set_apply_error_context_origin(char *originname)
5795 : {
5796 790 : apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
5797 : originname);
5798 790 : }
5799 :
5800 : /*
5801 : * Return the action to be taken for the given transaction. See
5802 : * TransApplyAction for information on each of the actions.
5803 : *
5804 : * *winfo is assigned to the destination parallel worker info when the leader
5805 : * apply worker has to pass all the transaction's changes to the parallel
5806 : * apply worker.
5807 : */
5808 : static TransApplyAction
5809 652402 : get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
5810 : {
5811 652402 : *winfo = NULL;
5812 :
5813 652402 : if (am_parallel_apply_worker())
5814 : {
5815 137824 : return TRANS_PARALLEL_APPLY;
5816 : }
5817 :
5818 : /*
5819 : * If we are processing this transaction using a parallel apply worker
5820 : * then either we send the changes to the parallel worker or if the worker
5821 : * is busy then serialize the changes to the file which will later be
5822 : * processed by the parallel worker.
5823 : */
5824 514578 : *winfo = pa_find_worker(xid);
5825 :
5826 514578 : if (*winfo && (*winfo)->serialize_changes)
5827 : {
5828 10074 : return TRANS_LEADER_PARTIAL_SERIALIZE;
5829 : }
5830 504504 : else if (*winfo)
5831 : {
5832 137792 : return TRANS_LEADER_SEND_TO_PARALLEL;
5833 : }
5834 :
5835 : /*
5836 : * If there is no parallel worker involved to process this transaction
5837 : * then we either directly apply the change or serialize it to a file
5838 : * which will later be applied when the transaction finish message is
5839 : * processed.
5840 : */
5841 366712 : else if (in_streamed_transaction)
5842 : {
5843 206392 : return TRANS_LEADER_SERIALIZE;
5844 : }
5845 : else
5846 : {
5847 160320 : return TRANS_LEADER_APPLY;
5848 : }
5849 : }
|