Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * worker.c
3 : * PostgreSQL logical replication worker (apply)
4 : *
5 : * Copyright (c) 2016-2025, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/worker.c
9 : *
10 : * NOTES
11 : * This file contains the worker which applies logical changes as they come
12 : * from remote logical replication stream.
13 : *
14 : * The main worker (apply) is started by logical replication worker
15 : * launcher for every enabled subscription in a database. It uses
16 : * walsender protocol to communicate with publisher.
17 : *
18 : * This module includes server facing code and shares libpqwalreceiver
19 : * module with walreceiver for providing the libpq specific functionality.
20 : *
21 : *
22 : * STREAMED TRANSACTIONS
23 : * ---------------------
24 : * Streamed transactions (large transactions exceeding a memory limit on the
25 : * upstream) are applied using one of two approaches:
26 : *
27 : * 1) Write to temporary files and apply when the final commit arrives
28 : *
29 : * This approach is used when the user has set the subscription's streaming
30 : * option as on.
31 : *
32 : * Unlike the regular (non-streamed) case, handling streamed transactions has
33 : * to handle aborts of both the toplevel transaction and subtransactions. This
34 : * is achieved by tracking offsets for subtransactions, which is then used
35 : * to truncate the file with serialized changes.
36 : *
37 : * The files are placed in tmp file directory by default, and the filenames
38 : * include both the XID of the toplevel transaction and OID of the
39 : * subscription. This is necessary so that different workers processing a
40 : * remote transaction with the same XID doesn't interfere.
41 : *
42 : * We use BufFiles instead of using normal temporary files because (a) the
43 : * BufFile infrastructure supports temporary files that exceed the OS file size
44 : * limit, (b) provides a way for automatic clean up on the error and (c) provides
45 : * a way to survive these files across local transactions and allow to open and
46 : * close at stream start and close. We decided to use FileSet
47 : * infrastructure as without that it deletes the files on the closure of the
48 : * file and if we decide to keep stream files open across the start/stop stream
49 : * then it will consume a lot of memory (more than 8K for each BufFile and
50 : * there could be multiple such BufFiles as the subscriber could receive
51 : * multiple start/stop streams for different transactions before getting the
52 : * commit). Moreover, if we don't use FileSet then we also need to invent
53 : * a new way to pass filenames to BufFile APIs so that we are allowed to open
54 : * the file we desired across multiple stream-open calls for the same
55 : * transaction.
56 : *
57 : * 2) Parallel apply workers.
58 : *
59 : * This approach is used when the user has set the subscription's streaming
60 : * option as parallel. See logical/applyparallelworker.c for information about
61 : * this approach.
62 : *
63 : * TWO_PHASE TRANSACTIONS
64 : * ----------------------
65 : * Two phase transactions are replayed at prepare and then committed or
66 : * rolled back at commit prepared and rollback prepared respectively. It is
67 : * possible to have a prepared transaction that arrives at the apply worker
68 : * when the tablesync is busy doing the initial copy. In this case, the apply
69 : * worker skips all the prepared operations [e.g. inserts] while the tablesync
70 : * is still busy (see the condition of should_apply_changes_for_rel). The
71 : * tablesync worker might not get such a prepared transaction because say it
72 : * was prior to the initial consistent point but might have got some later
73 : * commits. Now, the tablesync worker will exit without doing anything for the
74 : * prepared transaction skipped by the apply worker as the sync location for it
75 : * will be already ahead of the apply worker's current location. This would lead
76 : * to an "empty prepare", because later when the apply worker does the commit
77 : * prepare, there is nothing in it (the inserts were skipped earlier).
78 : *
79 : * To avoid this, and similar prepare confusions the subscription's two_phase
80 : * commit is enabled only after the initial sync is over. The two_phase option
81 : * has been implemented as a tri-state with values DISABLED, PENDING, and
82 : * ENABLED.
83 : *
84 : * Even if the user specifies they want a subscription with two_phase = on,
85 : * internally it will start with a tri-state of PENDING which only becomes
86 : * ENABLED after all tablesync initializations are completed - i.e. when all
87 : * tablesync workers have reached their READY state. In other words, the value
88 : * PENDING is only a temporary state for subscription start-up.
89 : *
90 : * Until the two_phase is properly available (ENABLED) the subscription will
91 : * behave as if two_phase = off. When the apply worker detects that all
92 : * tablesyncs have become READY (while the tri-state was PENDING) it will
93 : * restart the apply worker process. This happens in
94 : * process_syncing_tables_for_apply.
95 : *
96 : * When the (re-started) apply worker finds that all tablesyncs are READY for a
97 : * two_phase tri-state of PENDING it start streaming messages with the
98 : * two_phase option which in turn enables the decoding of two-phase commits at
99 : * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
100 : * Now, it is possible that during the time we have not enabled two_phase, the
101 : * publisher (replication server) would have skipped some prepares but we
102 : * ensure that such prepares are sent along with commit prepare, see
103 : * ReorderBufferFinishPrepared.
104 : *
105 : * If the subscription has no tables then a two_phase tri-state PENDING is
106 : * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
107 : * PUBLICATION which might otherwise be disallowed (see below).
108 : *
109 : * If ever a user needs to be aware of the tri-state value, they can fetch it
110 : * from the pg_subscription catalog (see column subtwophasestate).
111 : *
112 : * We don't allow to toggle two_phase option of a subscription because it can
113 : * lead to an inconsistent replica. Consider, initially, it was on and we have
114 : * received some prepare then we turn it off, now at commit time the server
115 : * will send the entire transaction data along with the commit. With some more
116 : * analysis, we can allow changing this option from off to on but not sure if
117 : * that alone would be useful.
118 : *
119 : * Finally, to avoid problems mentioned in previous paragraphs from any
120 : * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
121 : * to 'off' and then again back to 'on') there is a restriction for
122 : * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
123 : * the two_phase tri-state is ENABLED, except when copy_data = false.
124 : *
125 : * We can get prepare of the same GID more than once for the genuine cases
126 : * where we have defined multiple subscriptions for publications on the same
127 : * server and prepared transaction has operations on tables subscribed to those
128 : * subscriptions. For such cases, if we use the GID sent by publisher one of
129 : * the prepares will be successful and others will fail, in which case the
130 : * server will send them again. Now, this can lead to a deadlock if user has
131 : * set synchronous_standby_names for all the subscriptions on subscriber. To
132 : * avoid such deadlocks, we generate a unique GID (consisting of the
133 : * subscription oid and the xid of the prepared transaction) for each prepare
134 : * transaction on the subscriber.
135 : *
136 : * FAILOVER
137 : * ----------------------
138 : * The logical slot on the primary can be synced to the standby by specifying
139 : * failover = true when creating the subscription. Enabling failover allows us
140 : * to smoothly transition to the promoted standby, ensuring that we can
141 : * subscribe to the new primary without losing any data.
142 : *-------------------------------------------------------------------------
143 : */
144 :
145 : #include "postgres.h"
146 :
147 : #include <sys/stat.h>
148 : #include <unistd.h>
149 :
150 : #include "access/table.h"
151 : #include "access/tableam.h"
152 : #include "access/twophase.h"
153 : #include "access/xact.h"
154 : #include "catalog/indexing.h"
155 : #include "catalog/pg_inherits.h"
156 : #include "catalog/pg_subscription.h"
157 : #include "catalog/pg_subscription_rel.h"
158 : #include "commands/tablecmds.h"
159 : #include "commands/trigger.h"
160 : #include "executor/executor.h"
161 : #include "executor/execPartition.h"
162 : #include "libpq/pqformat.h"
163 : #include "miscadmin.h"
164 : #include "optimizer/optimizer.h"
165 : #include "parser/parse_relation.h"
166 : #include "pgstat.h"
167 : #include "postmaster/bgworker.h"
168 : #include "postmaster/interrupt.h"
169 : #include "postmaster/walwriter.h"
170 : #include "replication/conflict.h"
171 : #include "replication/logicallauncher.h"
172 : #include "replication/logicalproto.h"
173 : #include "replication/logicalrelation.h"
174 : #include "replication/logicalworker.h"
175 : #include "replication/origin.h"
176 : #include "replication/walreceiver.h"
177 : #include "replication/worker_internal.h"
178 : #include "rewrite/rewriteHandler.h"
179 : #include "storage/buffile.h"
180 : #include "storage/ipc.h"
181 : #include "storage/lmgr.h"
182 : #include "tcop/tcopprot.h"
183 : #include "utils/acl.h"
184 : #include "utils/dynahash.h"
185 : #include "utils/guc.h"
186 : #include "utils/inval.h"
187 : #include "utils/lsyscache.h"
188 : #include "utils/memutils.h"
189 : #include "utils/pg_lsn.h"
190 : #include "utils/rel.h"
191 : #include "utils/rls.h"
192 : #include "utils/snapmgr.h"
193 : #include "utils/syscache.h"
194 : #include "utils/usercontext.h"
195 :
196 : #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
197 :
198 : typedef struct FlushPosition
199 : {
200 : dlist_node node;
201 : XLogRecPtr local_end;
202 : XLogRecPtr remote_end;
203 : } FlushPosition;
204 :
205 : static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
206 :
207 : typedef struct ApplyExecutionData
208 : {
209 : EState *estate; /* executor state, used to track resources */
210 :
211 : LogicalRepRelMapEntry *targetRel; /* replication target rel */
212 : ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
213 :
214 : /* These fields are used when the target relation is partitioned: */
215 : ModifyTableState *mtstate; /* dummy ModifyTable state */
216 : PartitionTupleRouting *proute; /* partition routing info */
217 : } ApplyExecutionData;
218 :
219 : /* Struct for saving and restoring apply errcontext information */
220 : typedef struct ApplyErrorCallbackArg
221 : {
222 : LogicalRepMsgType command; /* 0 if invalid */
223 : LogicalRepRelMapEntry *rel;
224 :
225 : /* Remote node information */
226 : int remote_attnum; /* -1 if invalid */
227 : TransactionId remote_xid;
228 : XLogRecPtr finish_lsn;
229 : char *origin_name;
230 : } ApplyErrorCallbackArg;
231 :
232 : /*
233 : * The action to be taken for the changes in the transaction.
234 : *
235 : * TRANS_LEADER_APPLY:
236 : * This action means that we are in the leader apply worker or table sync
237 : * worker. The changes of the transaction are either directly applied or
238 : * are read from temporary files (for streaming transactions) and then
239 : * applied by the worker.
240 : *
241 : * TRANS_LEADER_SERIALIZE:
242 : * This action means that we are in the leader apply worker or table sync
243 : * worker. Changes are written to temporary files and then applied when the
244 : * final commit arrives.
245 : *
246 : * TRANS_LEADER_SEND_TO_PARALLEL:
247 : * This action means that we are in the leader apply worker and need to send
248 : * the changes to the parallel apply worker.
249 : *
250 : * TRANS_LEADER_PARTIAL_SERIALIZE:
251 : * This action means that we are in the leader apply worker and have sent some
252 : * changes directly to the parallel apply worker and the remaining changes are
253 : * serialized to a file, due to timeout while sending data. The parallel apply
254 : * worker will apply these serialized changes when the final commit arrives.
255 : *
256 : * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
257 : * serializing changes, the leader worker also needs to serialize the
258 : * STREAM_XXX message to a file, and wait for the parallel apply worker to
259 : * finish the transaction when processing the transaction finish command. So
260 : * this new action was introduced to keep the code and logic clear.
261 : *
262 : * TRANS_PARALLEL_APPLY:
263 : * This action means that we are in the parallel apply worker and changes of
264 : * the transaction are applied directly by the worker.
265 : */
266 : typedef enum
267 : {
268 : /* The action for non-streaming transactions. */
269 : TRANS_LEADER_APPLY,
270 :
271 : /* Actions for streaming transactions. */
272 : TRANS_LEADER_SERIALIZE,
273 : TRANS_LEADER_SEND_TO_PARALLEL,
274 : TRANS_LEADER_PARTIAL_SERIALIZE,
275 : TRANS_PARALLEL_APPLY,
276 : } TransApplyAction;
277 :
278 : /* errcontext tracker */
279 : static ApplyErrorCallbackArg apply_error_callback_arg =
280 : {
281 : .command = 0,
282 : .rel = NULL,
283 : .remote_attnum = -1,
284 : .remote_xid = InvalidTransactionId,
285 : .finish_lsn = InvalidXLogRecPtr,
286 : .origin_name = NULL,
287 : };
288 :
289 : ErrorContextCallback *apply_error_context_stack = NULL;
290 :
291 : MemoryContext ApplyMessageContext = NULL;
292 : MemoryContext ApplyContext = NULL;
293 :
294 : /* per stream context for streaming transactions */
295 : static MemoryContext LogicalStreamingContext = NULL;
296 :
297 : WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
298 :
299 : Subscription *MySubscription = NULL;
300 : static bool MySubscriptionValid = false;
301 :
302 : static List *on_commit_wakeup_workers_subids = NIL;
303 :
304 : bool in_remote_transaction = false;
305 : static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
306 :
307 : /* fields valid only when processing streamed transaction */
308 : static bool in_streamed_transaction = false;
309 :
310 : static TransactionId stream_xid = InvalidTransactionId;
311 :
312 : /*
313 : * The number of changes applied by parallel apply worker during one streaming
314 : * block.
315 : */
316 : static uint32 parallel_stream_nchanges = 0;
317 :
318 : /* Are we initializing an apply worker? */
319 : bool InitializingApplyWorker = false;
320 :
321 : /*
322 : * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
323 : * the subscription if the remote transaction's finish LSN matches the subskiplsn.
324 : * Once we start skipping changes, we don't stop it until we skip all changes of
325 : * the transaction even if pg_subscription is updated and MySubscription->skiplsn
326 : * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
327 : * we don't skip receiving and spooling the changes since we decide whether or not
328 : * to skip applying the changes when starting to apply changes. The subskiplsn is
329 : * cleared after successfully skipping the transaction or applying non-empty
330 : * transaction. The latter prevents the mistakenly specified subskiplsn from
331 : * being left. Note that we cannot skip the streaming transactions when using
332 : * parallel apply workers because we cannot get the finish LSN before applying
333 : * the changes. So, we don't start parallel apply worker when finish LSN is set
334 : * by the user.
335 : */
336 : static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
337 : #define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
338 :
339 : /* BufFile handle of the current streaming file */
340 : static BufFile *stream_fd = NULL;
341 :
342 : typedef struct SubXactInfo
343 : {
344 : TransactionId xid; /* XID of the subxact */
345 : int fileno; /* file number in the buffile */
346 : off_t offset; /* offset in the file */
347 : } SubXactInfo;
348 :
349 : /* Sub-transaction data for the current streaming transaction */
350 : typedef struct ApplySubXactData
351 : {
352 : uint32 nsubxacts; /* number of sub-transactions */
353 : uint32 nsubxacts_max; /* current capacity of subxacts */
354 : TransactionId subxact_last; /* xid of the last sub-transaction */
355 : SubXactInfo *subxacts; /* sub-xact offset in changes file */
356 : } ApplySubXactData;
357 :
358 : static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
359 :
360 : static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
361 : static inline void changes_filename(char *path, Oid subid, TransactionId xid);
362 :
363 : /*
364 : * Information about subtransactions of a given toplevel transaction.
365 : */
366 : static void subxact_info_write(Oid subid, TransactionId xid);
367 : static void subxact_info_read(Oid subid, TransactionId xid);
368 : static void subxact_info_add(TransactionId xid);
369 : static inline void cleanup_subxact_info(void);
370 :
371 : /*
372 : * Serialize and deserialize changes for a toplevel transaction.
373 : */
374 : static void stream_open_file(Oid subid, TransactionId xid,
375 : bool first_segment);
376 : static void stream_write_change(char action, StringInfo s);
377 : static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
378 : static void stream_close_file(void);
379 :
380 : static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
381 :
382 : static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
383 : static void apply_handle_insert_internal(ApplyExecutionData *edata,
384 : ResultRelInfo *relinfo,
385 : TupleTableSlot *remoteslot);
386 : static void apply_handle_update_internal(ApplyExecutionData *edata,
387 : ResultRelInfo *relinfo,
388 : TupleTableSlot *remoteslot,
389 : LogicalRepTupleData *newtup,
390 : Oid localindexoid);
391 : static void apply_handle_delete_internal(ApplyExecutionData *edata,
392 : ResultRelInfo *relinfo,
393 : TupleTableSlot *remoteslot,
394 : Oid localindexoid);
395 : static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
396 : LogicalRepRelation *remoterel,
397 : Oid localidxoid,
398 : TupleTableSlot *remoteslot,
399 : TupleTableSlot **localslot);
400 : static void apply_handle_tuple_routing(ApplyExecutionData *edata,
401 : TupleTableSlot *remoteslot,
402 : LogicalRepTupleData *newtup,
403 : CmdType operation);
404 :
405 : /* Functions for skipping changes */
406 : static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
407 : static void stop_skipping_changes(void);
408 : static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
409 :
410 : /* Functions for apply error callback */
411 : static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
412 : static inline void reset_apply_error_context_info(void);
413 :
414 : static TransApplyAction get_transaction_apply_action(TransactionId xid,
415 : ParallelApplyWorkerInfo **winfo);
416 :
417 : static void replorigin_reset(int code, Datum arg);
418 :
419 : /*
420 : * Form the origin name for the subscription.
421 : *
422 : * This is a common function for tablesync and other workers. Tablesync workers
423 : * must pass a valid relid. Other callers must pass relid = InvalidOid.
424 : *
425 : * Return the name in the supplied buffer.
426 : */
427 : void
428 2480 : ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
429 : char *originname, Size szoriginname)
430 : {
431 2480 : if (OidIsValid(relid))
432 : {
433 : /* Replication origin name for tablesync workers. */
434 1484 : snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
435 : }
436 : else
437 : {
438 : /* Replication origin name for non-tablesync workers. */
439 996 : snprintf(originname, szoriginname, "pg_%u", suboid);
440 : }
441 2480 : }
442 :
443 : /*
444 : * Should this worker apply changes for given relation.
445 : *
446 : * This is mainly needed for initial relation data sync as that runs in
447 : * separate worker process running in parallel and we need some way to skip
448 : * changes coming to the leader apply worker during the sync of a table.
449 : *
450 : * Note we need to do smaller or equals comparison for SYNCDONE state because
451 : * it might hold position of end of initial slot consistent point WAL
452 : * record + 1 (ie start of next record) and next record can be COMMIT of
453 : * transaction we are now processing (which is what we set remote_final_lsn
454 : * to in apply_handle_begin).
455 : *
456 : * Note that for streaming transactions that are being applied in the parallel
457 : * apply worker, we disallow applying changes if the target table in the
458 : * subscription is not in the READY state, because we cannot decide whether to
459 : * apply the change as we won't know remote_final_lsn by that time.
460 : *
461 : * We already checked this in pa_can_start() before assigning the
462 : * streaming transaction to the parallel worker, but it also needs to be
463 : * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
464 : * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
465 : * while applying this transaction.
466 : */
467 : static bool
468 297122 : should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
469 : {
470 297122 : switch (MyLogicalRepWorker->type)
471 : {
472 0 : case WORKERTYPE_TABLESYNC:
473 0 : return MyLogicalRepWorker->relid == rel->localreloid;
474 :
475 137660 : case WORKERTYPE_PARALLEL_APPLY:
476 : /* We don't synchronize rel's that are in unknown state. */
477 137660 : if (rel->state != SUBREL_STATE_READY &&
478 0 : rel->state != SUBREL_STATE_UNKNOWN)
479 0 : ereport(ERROR,
480 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
481 : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
482 : MySubscription->name),
483 : errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
484 :
485 137660 : return rel->state == SUBREL_STATE_READY;
486 :
487 159462 : case WORKERTYPE_APPLY:
488 159560 : return (rel->state == SUBREL_STATE_READY ||
489 98 : (rel->state == SUBREL_STATE_SYNCDONE &&
490 0 : rel->statelsn <= remote_final_lsn));
491 :
492 0 : case WORKERTYPE_UNKNOWN:
493 : /* Should never happen. */
494 0 : elog(ERROR, "Unknown worker type");
495 : }
496 :
497 0 : return false; /* dummy for compiler */
498 : }
499 :
500 : /*
501 : * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
502 : *
503 : * Start a transaction, if this is the first step (else we keep using the
504 : * existing transaction).
505 : * Also provide a global snapshot and ensure we run in ApplyMessageContext.
506 : */
507 : static void
508 298030 : begin_replication_step(void)
509 : {
510 298030 : SetCurrentStatementStartTimestamp();
511 :
512 298030 : if (!IsTransactionState())
513 : {
514 1854 : StartTransactionCommand();
515 1854 : maybe_reread_subscription();
516 : }
517 :
518 298024 : PushActiveSnapshot(GetTransactionSnapshot());
519 :
520 298024 : MemoryContextSwitchTo(ApplyMessageContext);
521 298024 : }
522 :
523 : /*
524 : * Finish up one step of a replication transaction.
525 : * Callers of begin_replication_step() must also call this.
526 : *
527 : * We don't close out the transaction here, but we should increment
528 : * the command counter to make the effects of this step visible.
529 : */
530 : static void
531 297968 : end_replication_step(void)
532 : {
533 297968 : PopActiveSnapshot();
534 :
535 297968 : CommandCounterIncrement();
536 297968 : }
537 :
538 : /*
539 : * Handle streamed transactions for both the leader apply worker and the
540 : * parallel apply workers.
541 : *
542 : * In the streaming case (receiving a block of the streamed transaction), for
543 : * serialize mode, simply redirect it to a file for the proper toplevel
544 : * transaction, and for parallel mode, the leader apply worker will send the
545 : * changes to parallel apply workers and the parallel apply worker will define
546 : * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
547 : * messages will be applied by both leader apply worker and parallel apply
548 : * workers).
549 : *
550 : * Returns true for streamed transactions (when the change is either serialized
551 : * to file or sent to parallel apply worker), false otherwise (regular mode or
552 : * needs to be processed by parallel apply worker).
553 : *
554 : * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
555 : * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
556 : * to a parallel apply worker.
557 : */
558 : static bool
559 649704 : handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
560 : {
561 : TransactionId current_xid;
562 : ParallelApplyWorkerInfo *winfo;
563 : TransApplyAction apply_action;
564 : StringInfoData original_msg;
565 :
566 649704 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
567 :
568 : /* not in streaming mode */
569 649704 : if (apply_action == TRANS_LEADER_APPLY)
570 160180 : return false;
571 :
572 : Assert(TransactionIdIsValid(stream_xid));
573 :
574 : /*
575 : * The parallel apply worker needs the xid in this message to decide
576 : * whether to define a savepoint, so save the original message that has
577 : * not moved the cursor after the xid. We will serialize this message to a
578 : * file in PARTIAL_SERIALIZE mode.
579 : */
580 489524 : original_msg = *s;
581 :
582 : /*
583 : * We should have received XID of the subxact as the first part of the
584 : * message, so extract it.
585 : */
586 489524 : current_xid = pq_getmsgint(s, 4);
587 :
588 489524 : if (!TransactionIdIsValid(current_xid))
589 0 : ereport(ERROR,
590 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
591 : errmsg_internal("invalid transaction ID in streamed replication transaction")));
592 :
593 489524 : switch (apply_action)
594 : {
595 205024 : case TRANS_LEADER_SERIALIZE:
596 : Assert(stream_fd);
597 :
598 : /* Add the new subxact to the array (unless already there). */
599 205024 : subxact_info_add(current_xid);
600 :
601 : /* Write the change to the current file */
602 205024 : stream_write_change(action, s);
603 205024 : return true;
604 :
605 136772 : case TRANS_LEADER_SEND_TO_PARALLEL:
606 : Assert(winfo);
607 :
608 : /*
609 : * XXX The publisher side doesn't always send relation/type update
610 : * messages after the streaming transaction, so also update the
611 : * relation/type in leader apply worker. See function
612 : * cleanup_rel_sync_cache.
613 : */
614 136772 : if (pa_send_data(winfo, s->len, s->data))
615 136772 : return (action != LOGICAL_REP_MSG_RELATION &&
616 : action != LOGICAL_REP_MSG_TYPE);
617 :
618 : /*
619 : * Switch to serialize mode when we are not able to send the
620 : * change to parallel apply worker.
621 : */
622 0 : pa_switch_to_partial_serialize(winfo, false);
623 :
624 : /* fall through */
625 10012 : case TRANS_LEADER_PARTIAL_SERIALIZE:
626 10012 : stream_write_change(action, &original_msg);
627 :
628 : /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
629 10012 : return (action != LOGICAL_REP_MSG_RELATION &&
630 : action != LOGICAL_REP_MSG_TYPE);
631 :
632 137716 : case TRANS_PARALLEL_APPLY:
633 137716 : parallel_stream_nchanges += 1;
634 :
635 : /* Define a savepoint for a subxact if needed. */
636 137716 : pa_start_subtrans(current_xid, stream_xid);
637 137716 : return false;
638 :
639 0 : default:
640 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
641 : return false; /* silence compiler warning */
642 : }
643 : }
644 :
645 : /*
646 : * Executor state preparation for evaluation of constraint expressions,
647 : * indexes and triggers for the specified relation.
648 : *
649 : * Note that the caller must open and close any indexes to be updated.
650 : */
651 : static ApplyExecutionData *
652 296966 : create_edata_for_relation(LogicalRepRelMapEntry *rel)
653 : {
654 : ApplyExecutionData *edata;
655 : EState *estate;
656 : RangeTblEntry *rte;
657 296966 : List *perminfos = NIL;
658 : ResultRelInfo *resultRelInfo;
659 :
660 296966 : edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
661 296966 : edata->targetRel = rel;
662 :
663 296966 : edata->estate = estate = CreateExecutorState();
664 :
665 296966 : rte = makeNode(RangeTblEntry);
666 296966 : rte->rtekind = RTE_RELATION;
667 296966 : rte->relid = RelationGetRelid(rel->localrel);
668 296966 : rte->relkind = rel->localrel->rd_rel->relkind;
669 296966 : rte->rellockmode = AccessShareLock;
670 :
671 296966 : addRTEPermissionInfo(&perminfos, rte);
672 :
673 296966 : ExecInitRangeTable(estate, list_make1(rte), perminfos,
674 : bms_make_singleton(1));
675 :
676 296966 : edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
677 :
678 : /*
679 : * Use Relation opened by logicalrep_rel_open() instead of opening it
680 : * again.
681 : */
682 296966 : InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
683 :
684 : /*
685 : * We put the ResultRelInfo in the es_opened_result_relations list, even
686 : * though we don't populate the es_result_relations array. That's a bit
687 : * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
688 : *
689 : * ExecOpenIndices() is not called here either, each execution path doing
690 : * an apply operation being responsible for that.
691 : */
692 296966 : estate->es_opened_result_relations =
693 296966 : lappend(estate->es_opened_result_relations, resultRelInfo);
694 :
695 296966 : estate->es_output_cid = GetCurrentCommandId(true);
696 :
697 : /* Prepare to catch AFTER triggers. */
698 296966 : AfterTriggerBeginQuery();
699 :
700 : /* other fields of edata remain NULL for now */
701 :
702 296966 : return edata;
703 : }
704 :
705 : /*
706 : * Finish any operations related to the executor state created by
707 : * create_edata_for_relation().
708 : */
709 : static void
710 296924 : finish_edata(ApplyExecutionData *edata)
711 : {
712 296924 : EState *estate = edata->estate;
713 :
714 : /* Handle any queued AFTER triggers. */
715 296924 : AfterTriggerEndQuery(estate);
716 :
717 : /* Shut down tuple routing, if any was done. */
718 296924 : if (edata->proute)
719 148 : ExecCleanupTupleRouting(edata->mtstate, edata->proute);
720 :
721 : /*
722 : * Cleanup. It might seem that we should call ExecCloseResultRelations()
723 : * here, but we intentionally don't. It would close the rel we added to
724 : * es_opened_result_relations above, which is wrong because we took no
725 : * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
726 : * any other relations opened during execution.
727 : */
728 296924 : ExecResetTupleTable(estate->es_tupleTable, false);
729 296924 : FreeExecutorState(estate);
730 296924 : pfree(edata);
731 296924 : }
732 :
733 : /*
734 : * Executes default values for columns for which we can't map to remote
735 : * relation columns.
736 : *
737 : * This allows us to support tables which have more columns on the downstream
738 : * than on the upstream.
739 : */
740 : static void
741 152462 : slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
742 : TupleTableSlot *slot)
743 : {
744 152462 : TupleDesc desc = RelationGetDescr(rel->localrel);
745 152462 : int num_phys_attrs = desc->natts;
746 : int i;
747 : int attnum,
748 152462 : num_defaults = 0;
749 : int *defmap;
750 : ExprState **defexprs;
751 : ExprContext *econtext;
752 :
753 152462 : econtext = GetPerTupleExprContext(estate);
754 :
755 : /* We got all the data via replication, no need to evaluate anything. */
756 152462 : if (num_phys_attrs == rel->remoterel.natts)
757 72172 : return;
758 :
759 80290 : defmap = (int *) palloc(num_phys_attrs * sizeof(int));
760 80290 : defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
761 :
762 : Assert(rel->attrmap->maplen == num_phys_attrs);
763 421326 : for (attnum = 0; attnum < num_phys_attrs; attnum++)
764 : {
765 : Expr *defexpr;
766 :
767 341036 : if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
768 18 : continue;
769 :
770 341018 : if (rel->attrmap->attnums[attnum] >= 0)
771 184536 : continue;
772 :
773 156482 : defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
774 :
775 156482 : if (defexpr != NULL)
776 : {
777 : /* Run the expression through planner */
778 140262 : defexpr = expression_planner(defexpr);
779 :
780 : /* Initialize executable expression in copycontext */
781 140262 : defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
782 140262 : defmap[num_defaults] = attnum;
783 140262 : num_defaults++;
784 : }
785 : }
786 :
787 220552 : for (i = 0; i < num_defaults; i++)
788 140262 : slot->tts_values[defmap[i]] =
789 140262 : ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
790 : }
791 :
792 : /*
793 : * Store tuple data into slot.
794 : *
795 : * Incoming data can be either text or binary format.
796 : */
797 : static void
798 296984 : slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
799 : LogicalRepTupleData *tupleData)
800 : {
801 296984 : int natts = slot->tts_tupleDescriptor->natts;
802 : int i;
803 :
804 296984 : ExecClearTuple(slot);
805 :
806 : /* Call the "in" function for each non-dropped, non-null attribute */
807 : Assert(natts == rel->attrmap->maplen);
808 1316906 : for (i = 0; i < natts; i++)
809 : {
810 1019922 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
811 1019922 : int remoteattnum = rel->attrmap->attnums[i];
812 :
813 1019922 : if (!att->attisdropped && remoteattnum >= 0)
814 606088 : {
815 606088 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
816 :
817 : Assert(remoteattnum < tupleData->ncols);
818 :
819 : /* Set attnum for error callback */
820 606088 : apply_error_callback_arg.remote_attnum = remoteattnum;
821 :
822 606088 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
823 : {
824 : Oid typinput;
825 : Oid typioparam;
826 :
827 284544 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
828 569088 : slot->tts_values[i] =
829 284544 : OidInputFunctionCall(typinput, colvalue->data,
830 : typioparam, att->atttypmod);
831 284544 : slot->tts_isnull[i] = false;
832 : }
833 321544 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
834 : {
835 : Oid typreceive;
836 : Oid typioparam;
837 :
838 : /*
839 : * In some code paths we may be asked to re-parse the same
840 : * tuple data. Reset the StringInfo's cursor so that works.
841 : */
842 220890 : colvalue->cursor = 0;
843 :
844 220890 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
845 441780 : slot->tts_values[i] =
846 220890 : OidReceiveFunctionCall(typreceive, colvalue,
847 : typioparam, att->atttypmod);
848 :
849 : /* Trouble if it didn't eat the whole buffer */
850 220890 : if (colvalue->cursor != colvalue->len)
851 0 : ereport(ERROR,
852 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
853 : errmsg("incorrect binary data format in logical replication column %d",
854 : remoteattnum + 1)));
855 220890 : slot->tts_isnull[i] = false;
856 : }
857 : else
858 : {
859 : /*
860 : * NULL value from remote. (We don't expect to see
861 : * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
862 : * NULL.)
863 : */
864 100654 : slot->tts_values[i] = (Datum) 0;
865 100654 : slot->tts_isnull[i] = true;
866 : }
867 :
868 : /* Reset attnum for error callback */
869 606088 : apply_error_callback_arg.remote_attnum = -1;
870 : }
871 : else
872 : {
873 : /*
874 : * We assign NULL to dropped attributes and missing values
875 : * (missing values should be later filled using
876 : * slot_fill_defaults).
877 : */
878 413834 : slot->tts_values[i] = (Datum) 0;
879 413834 : slot->tts_isnull[i] = true;
880 : }
881 : }
882 :
883 296984 : ExecStoreVirtualTuple(slot);
884 296984 : }
885 :
886 : /*
887 : * Replace updated columns with data from the LogicalRepTupleData struct.
888 : * This is somewhat similar to heap_modify_tuple but also calls the type
889 : * input functions on the user data.
890 : *
891 : * "slot" is filled with a copy of the tuple in "srcslot", replacing
892 : * columns provided in "tupleData" and leaving others as-is.
893 : *
894 : * Caution: unreplaced pass-by-ref columns in "slot" will point into the
895 : * storage for "srcslot". This is OK for current usage, but someday we may
896 : * need to materialize "slot" at the end to make it independent of "srcslot".
897 : */
898 : static void
899 63848 : slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
900 : LogicalRepRelMapEntry *rel,
901 : LogicalRepTupleData *tupleData)
902 : {
903 63848 : int natts = slot->tts_tupleDescriptor->natts;
904 : int i;
905 :
906 : /* We'll fill "slot" with a virtual tuple, so we must start with ... */
907 63848 : ExecClearTuple(slot);
908 :
909 : /*
910 : * Copy all the column data from srcslot, so that we'll have valid values
911 : * for unreplaced columns.
912 : */
913 : Assert(natts == srcslot->tts_tupleDescriptor->natts);
914 63848 : slot_getallattrs(srcslot);
915 63848 : memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
916 63848 : memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
917 :
918 : /* Call the "in" function for each replaced attribute */
919 : Assert(natts == rel->attrmap->maplen);
920 318560 : for (i = 0; i < natts; i++)
921 : {
922 254712 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
923 254712 : int remoteattnum = rel->attrmap->attnums[i];
924 :
925 254712 : if (remoteattnum < 0)
926 117038 : continue;
927 :
928 : Assert(remoteattnum < tupleData->ncols);
929 :
930 137674 : if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
931 : {
932 137668 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
933 :
934 : /* Set attnum for error callback */
935 137668 : apply_error_callback_arg.remote_attnum = remoteattnum;
936 :
937 137668 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
938 : {
939 : Oid typinput;
940 : Oid typioparam;
941 :
942 50860 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
943 101720 : slot->tts_values[i] =
944 50860 : OidInputFunctionCall(typinput, colvalue->data,
945 : typioparam, att->atttypmod);
946 50860 : slot->tts_isnull[i] = false;
947 : }
948 86808 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
949 : {
950 : Oid typreceive;
951 : Oid typioparam;
952 :
953 : /*
954 : * In some code paths we may be asked to re-parse the same
955 : * tuple data. Reset the StringInfo's cursor so that works.
956 : */
957 86712 : colvalue->cursor = 0;
958 :
959 86712 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
960 173424 : slot->tts_values[i] =
961 86712 : OidReceiveFunctionCall(typreceive, colvalue,
962 : typioparam, att->atttypmod);
963 :
964 : /* Trouble if it didn't eat the whole buffer */
965 86712 : if (colvalue->cursor != colvalue->len)
966 0 : ereport(ERROR,
967 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
968 : errmsg("incorrect binary data format in logical replication column %d",
969 : remoteattnum + 1)));
970 86712 : slot->tts_isnull[i] = false;
971 : }
972 : else
973 : {
974 : /* must be LOGICALREP_COLUMN_NULL */
975 96 : slot->tts_values[i] = (Datum) 0;
976 96 : slot->tts_isnull[i] = true;
977 : }
978 :
979 : /* Reset attnum for error callback */
980 137668 : apply_error_callback_arg.remote_attnum = -1;
981 : }
982 : }
983 :
984 : /* And finally, declare that "slot" contains a valid virtual tuple */
985 63848 : ExecStoreVirtualTuple(slot);
986 63848 : }
987 :
988 : /*
989 : * Handle BEGIN message.
990 : */
991 : static void
992 906 : apply_handle_begin(StringInfo s)
993 : {
994 : LogicalRepBeginData begin_data;
995 :
996 : /* There must not be an active streaming transaction. */
997 : Assert(!TransactionIdIsValid(stream_xid));
998 :
999 906 : logicalrep_read_begin(s, &begin_data);
1000 906 : set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
1001 :
1002 906 : remote_final_lsn = begin_data.final_lsn;
1003 :
1004 906 : maybe_start_skipping_changes(begin_data.final_lsn);
1005 :
1006 906 : in_remote_transaction = true;
1007 :
1008 906 : pgstat_report_activity(STATE_RUNNING, NULL);
1009 906 : }
1010 :
1011 : /*
1012 : * Handle COMMIT message.
1013 : *
1014 : * TODO, support tracking of multiple origins
1015 : */
1016 : static void
1017 848 : apply_handle_commit(StringInfo s)
1018 : {
1019 : LogicalRepCommitData commit_data;
1020 :
1021 848 : logicalrep_read_commit(s, &commit_data);
1022 :
1023 848 : if (commit_data.commit_lsn != remote_final_lsn)
1024 0 : ereport(ERROR,
1025 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1026 : errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
1027 : LSN_FORMAT_ARGS(commit_data.commit_lsn),
1028 : LSN_FORMAT_ARGS(remote_final_lsn))));
1029 :
1030 848 : apply_handle_commit_internal(&commit_data);
1031 :
1032 : /* Process any tables that are being synchronized in parallel. */
1033 848 : process_syncing_tables(commit_data.end_lsn);
1034 :
1035 848 : pgstat_report_activity(STATE_IDLE, NULL);
1036 848 : reset_apply_error_context_info();
1037 848 : }
1038 :
1039 : /*
1040 : * Handle BEGIN PREPARE message.
1041 : */
1042 : static void
1043 32 : apply_handle_begin_prepare(StringInfo s)
1044 : {
1045 : LogicalRepPreparedTxnData begin_data;
1046 :
1047 : /* Tablesync should never receive prepare. */
1048 32 : if (am_tablesync_worker())
1049 0 : ereport(ERROR,
1050 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1051 : errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1052 :
1053 : /* There must not be an active streaming transaction. */
1054 : Assert(!TransactionIdIsValid(stream_xid));
1055 :
1056 32 : logicalrep_read_begin_prepare(s, &begin_data);
1057 32 : set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
1058 :
1059 32 : remote_final_lsn = begin_data.prepare_lsn;
1060 :
1061 32 : maybe_start_skipping_changes(begin_data.prepare_lsn);
1062 :
1063 32 : in_remote_transaction = true;
1064 :
1065 32 : pgstat_report_activity(STATE_RUNNING, NULL);
1066 32 : }
1067 :
1068 : /*
1069 : * Common function to prepare the GID.
1070 : */
1071 : static void
1072 46 : apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
1073 : {
1074 : char gid[GIDSIZE];
1075 :
1076 : /*
1077 : * Compute unique GID for two_phase transactions. We don't use GID of
1078 : * prepared transaction sent by server as that can lead to deadlock when
1079 : * we have multiple subscriptions from same node point to publications on
1080 : * the same node. See comments atop worker.c
1081 : */
1082 46 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
1083 : gid, sizeof(gid));
1084 :
1085 : /*
1086 : * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1087 : * called within the PrepareTransactionBlock below.
1088 : */
1089 46 : if (!IsTransactionBlock())
1090 : {
1091 46 : BeginTransactionBlock();
1092 46 : CommitTransactionCommand(); /* Completes the preceding Begin command. */
1093 : }
1094 :
1095 : /*
1096 : * Update origin state so we can restart streaming from correct position
1097 : * in case of crash.
1098 : */
1099 46 : replorigin_session_origin_lsn = prepare_data->end_lsn;
1100 46 : replorigin_session_origin_timestamp = prepare_data->prepare_time;
1101 :
1102 46 : PrepareTransactionBlock(gid);
1103 46 : }
1104 :
1105 : /*
1106 : * Handle PREPARE message.
1107 : */
1108 : static void
1109 30 : apply_handle_prepare(StringInfo s)
1110 : {
1111 : LogicalRepPreparedTxnData prepare_data;
1112 :
1113 30 : logicalrep_read_prepare(s, &prepare_data);
1114 :
1115 30 : if (prepare_data.prepare_lsn != remote_final_lsn)
1116 0 : ereport(ERROR,
1117 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1118 : errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
1119 : LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1120 : LSN_FORMAT_ARGS(remote_final_lsn))));
1121 :
1122 : /*
1123 : * Unlike commit, here, we always prepare the transaction even though no
1124 : * change has happened in this transaction or all changes are skipped. It
1125 : * is done this way because at commit prepared time, we won't know whether
1126 : * we have skipped preparing a transaction because of those reasons.
1127 : *
1128 : * XXX, We can optimize such that at commit prepared time, we first check
1129 : * whether we have prepared the transaction or not but that doesn't seem
1130 : * worthwhile because such cases shouldn't be common.
1131 : */
1132 30 : begin_replication_step();
1133 :
1134 30 : apply_handle_prepare_internal(&prepare_data);
1135 :
1136 30 : end_replication_step();
1137 30 : CommitTransactionCommand();
1138 28 : pgstat_report_stat(false);
1139 :
1140 : /*
1141 : * It is okay not to set the local_end LSN for the prepare because we
1142 : * always flush the prepare record. So, we can send the acknowledgment of
1143 : * the remote_end LSN as soon as prepare is finished.
1144 : *
1145 : * XXX For the sake of consistency with commit, we could have set it with
1146 : * the LSN of prepare but as of now we don't track that value similar to
1147 : * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
1148 : * it.
1149 : */
1150 28 : store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
1151 :
1152 28 : in_remote_transaction = false;
1153 :
1154 : /* Process any tables that are being synchronized in parallel. */
1155 28 : process_syncing_tables(prepare_data.end_lsn);
1156 :
1157 : /*
1158 : * Since we have already prepared the transaction, in a case where the
1159 : * server crashes before clearing the subskiplsn, it will be left but the
1160 : * transaction won't be resent. But that's okay because it's a rare case
1161 : * and the subskiplsn will be cleared when finishing the next transaction.
1162 : */
1163 28 : stop_skipping_changes();
1164 28 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
1165 :
1166 28 : pgstat_report_activity(STATE_IDLE, NULL);
1167 28 : reset_apply_error_context_info();
1168 28 : }
1169 :
1170 : /*
1171 : * Handle a COMMIT PREPARED of a previously PREPARED transaction.
1172 : *
1173 : * Note that we don't need to wait here if the transaction was prepared in a
1174 : * parallel apply worker. In that case, we have already waited for the prepare
1175 : * to finish in apply_handle_stream_prepare() which will ensure all the
1176 : * operations in that transaction have happened in the subscriber, so no
1177 : * concurrent transaction can cause deadlock or transaction dependency issues.
1178 : */
1179 : static void
1180 40 : apply_handle_commit_prepared(StringInfo s)
1181 : {
1182 : LogicalRepCommitPreparedTxnData prepare_data;
1183 : char gid[GIDSIZE];
1184 :
1185 40 : logicalrep_read_commit_prepared(s, &prepare_data);
1186 40 : set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
1187 :
1188 : /* Compute GID for two_phase transactions. */
1189 40 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
1190 : gid, sizeof(gid));
1191 :
1192 : /* There is no transaction when COMMIT PREPARED is called */
1193 40 : begin_replication_step();
1194 :
1195 : /*
1196 : * Update origin state so we can restart streaming from correct position
1197 : * in case of crash.
1198 : */
1199 40 : replorigin_session_origin_lsn = prepare_data.end_lsn;
1200 40 : replorigin_session_origin_timestamp = prepare_data.commit_time;
1201 :
1202 40 : FinishPreparedTransaction(gid, true);
1203 40 : end_replication_step();
1204 40 : CommitTransactionCommand();
1205 40 : pgstat_report_stat(false);
1206 :
1207 40 : store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
1208 40 : in_remote_transaction = false;
1209 :
1210 : /* Process any tables that are being synchronized in parallel. */
1211 40 : process_syncing_tables(prepare_data.end_lsn);
1212 :
1213 40 : clear_subscription_skip_lsn(prepare_data.end_lsn);
1214 :
1215 40 : pgstat_report_activity(STATE_IDLE, NULL);
1216 40 : reset_apply_error_context_info();
1217 40 : }
1218 :
1219 : /*
1220 : * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
1221 : *
1222 : * Note that we don't need to wait here if the transaction was prepared in a
1223 : * parallel apply worker. In that case, we have already waited for the prepare
1224 : * to finish in apply_handle_stream_prepare() which will ensure all the
1225 : * operations in that transaction have happened in the subscriber, so no
1226 : * concurrent transaction can cause deadlock or transaction dependency issues.
1227 : */
1228 : static void
1229 10 : apply_handle_rollback_prepared(StringInfo s)
1230 : {
1231 : LogicalRepRollbackPreparedTxnData rollback_data;
1232 : char gid[GIDSIZE];
1233 :
1234 10 : logicalrep_read_rollback_prepared(s, &rollback_data);
1235 10 : set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1236 :
1237 : /* Compute GID for two_phase transactions. */
1238 10 : TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
1239 : gid, sizeof(gid));
1240 :
1241 : /*
1242 : * It is possible that we haven't received prepare because it occurred
1243 : * before walsender reached a consistent point or the two_phase was still
1244 : * not enabled by that time, so in such cases, we need to skip rollback
1245 : * prepared.
1246 : */
1247 10 : if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1248 : rollback_data.prepare_time))
1249 : {
1250 : /*
1251 : * Update origin state so we can restart streaming from correct
1252 : * position in case of crash.
1253 : */
1254 10 : replorigin_session_origin_lsn = rollback_data.rollback_end_lsn;
1255 10 : replorigin_session_origin_timestamp = rollback_data.rollback_time;
1256 :
1257 : /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1258 10 : begin_replication_step();
1259 10 : FinishPreparedTransaction(gid, false);
1260 10 : end_replication_step();
1261 10 : CommitTransactionCommand();
1262 :
1263 10 : clear_subscription_skip_lsn(rollback_data.rollback_end_lsn);
1264 : }
1265 :
1266 10 : pgstat_report_stat(false);
1267 :
1268 : /*
1269 : * It is okay not to set the local_end LSN for the rollback of prepared
1270 : * transaction because we always flush the WAL record for it. See
1271 : * apply_handle_prepare.
1272 : */
1273 10 : store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr);
1274 10 : in_remote_transaction = false;
1275 :
1276 : /* Process any tables that are being synchronized in parallel. */
1277 10 : process_syncing_tables(rollback_data.rollback_end_lsn);
1278 :
1279 10 : pgstat_report_activity(STATE_IDLE, NULL);
1280 10 : reset_apply_error_context_info();
1281 10 : }
1282 :
1283 : /*
1284 : * Handle STREAM PREPARE.
1285 : */
1286 : static void
1287 22 : apply_handle_stream_prepare(StringInfo s)
1288 : {
1289 : LogicalRepPreparedTxnData prepare_data;
1290 : ParallelApplyWorkerInfo *winfo;
1291 : TransApplyAction apply_action;
1292 :
1293 : /* Save the message before it is consumed. */
1294 22 : StringInfoData original_msg = *s;
1295 :
1296 22 : if (in_streamed_transaction)
1297 0 : ereport(ERROR,
1298 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1299 : errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1300 :
1301 : /* Tablesync should never receive prepare. */
1302 22 : if (am_tablesync_worker())
1303 0 : ereport(ERROR,
1304 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1305 : errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1306 :
1307 22 : logicalrep_read_stream_prepare(s, &prepare_data);
1308 22 : set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1309 :
1310 22 : apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
1311 :
1312 22 : switch (apply_action)
1313 : {
1314 10 : case TRANS_LEADER_APPLY:
1315 :
1316 : /*
1317 : * The transaction has been serialized to file, so replay all the
1318 : * spooled operations.
1319 : */
1320 10 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset,
1321 : prepare_data.xid, prepare_data.prepare_lsn);
1322 :
1323 : /* Mark the transaction as prepared. */
1324 10 : apply_handle_prepare_internal(&prepare_data);
1325 :
1326 10 : CommitTransactionCommand();
1327 :
1328 : /*
1329 : * It is okay not to set the local_end LSN for the prepare because
1330 : * we always flush the prepare record. See apply_handle_prepare.
1331 : */
1332 10 : store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
1333 :
1334 10 : in_remote_transaction = false;
1335 :
1336 : /* Unlink the files with serialized changes and subxact info. */
1337 10 : stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
1338 :
1339 10 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1340 10 : break;
1341 :
1342 4 : case TRANS_LEADER_SEND_TO_PARALLEL:
1343 : Assert(winfo);
1344 :
1345 4 : if (pa_send_data(winfo, s->len, s->data))
1346 : {
1347 : /* Finish processing the streaming transaction. */
1348 4 : pa_xact_finish(winfo, prepare_data.end_lsn);
1349 4 : break;
1350 : }
1351 :
1352 : /*
1353 : * Switch to serialize mode when we are not able to send the
1354 : * change to parallel apply worker.
1355 : */
1356 0 : pa_switch_to_partial_serialize(winfo, true);
1357 :
1358 : /* fall through */
1359 2 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1360 : Assert(winfo);
1361 :
1362 2 : stream_open_and_write_change(prepare_data.xid,
1363 : LOGICAL_REP_MSG_STREAM_PREPARE,
1364 : &original_msg);
1365 :
1366 2 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
1367 :
1368 : /* Finish processing the streaming transaction. */
1369 2 : pa_xact_finish(winfo, prepare_data.end_lsn);
1370 2 : break;
1371 :
1372 6 : case TRANS_PARALLEL_APPLY:
1373 :
1374 : /*
1375 : * If the parallel apply worker is applying spooled messages then
1376 : * close the file before preparing.
1377 : */
1378 6 : if (stream_fd)
1379 2 : stream_close_file();
1380 :
1381 6 : begin_replication_step();
1382 :
1383 : /* Mark the transaction as prepared. */
1384 6 : apply_handle_prepare_internal(&prepare_data);
1385 :
1386 6 : end_replication_step();
1387 :
1388 6 : CommitTransactionCommand();
1389 :
1390 : /*
1391 : * It is okay not to set the local_end LSN for the prepare because
1392 : * we always flush the prepare record. See apply_handle_prepare.
1393 : */
1394 6 : MyParallelShared->last_commit_end = InvalidXLogRecPtr;
1395 :
1396 6 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
1397 6 : pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1398 :
1399 6 : pa_reset_subtrans();
1400 :
1401 6 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1402 6 : break;
1403 :
1404 0 : default:
1405 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1406 : break;
1407 : }
1408 :
1409 22 : pgstat_report_stat(false);
1410 :
1411 : /* Process any tables that are being synchronized in parallel. */
1412 22 : process_syncing_tables(prepare_data.end_lsn);
1413 :
1414 : /*
1415 : * Similar to prepare case, the subskiplsn could be left in a case of
1416 : * server crash but it's okay. See the comments in apply_handle_prepare().
1417 : */
1418 22 : stop_skipping_changes();
1419 22 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
1420 :
1421 22 : pgstat_report_activity(STATE_IDLE, NULL);
1422 :
1423 22 : reset_apply_error_context_info();
1424 22 : }
1425 :
1426 : /*
1427 : * Handle ORIGIN message.
1428 : *
1429 : * TODO, support tracking of multiple origins
1430 : */
1431 : static void
1432 14 : apply_handle_origin(StringInfo s)
1433 : {
1434 : /*
1435 : * ORIGIN message can only come inside streaming transaction or inside
1436 : * remote transaction and before any actual writes.
1437 : */
1438 14 : if (!in_streamed_transaction &&
1439 20 : (!in_remote_transaction ||
1440 10 : (IsTransactionState() && !am_tablesync_worker())))
1441 0 : ereport(ERROR,
1442 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1443 : errmsg_internal("ORIGIN message sent out of order")));
1444 14 : }
1445 :
1446 : /*
1447 : * Initialize fileset (if not already done).
1448 : *
1449 : * Create a new file when first_segment is true, otherwise open the existing
1450 : * file.
1451 : */
1452 : void
1453 726 : stream_start_internal(TransactionId xid, bool first_segment)
1454 : {
1455 726 : begin_replication_step();
1456 :
1457 : /*
1458 : * Initialize the worker's stream_fileset if we haven't yet. This will be
1459 : * used for the entire duration of the worker so create it in a permanent
1460 : * context. We create this on the very first streaming message from any
1461 : * transaction and then use it for this and other streaming transactions.
1462 : * Now, we could create a fileset at the start of the worker as well but
1463 : * then we won't be sure that it will ever be used.
1464 : */
1465 726 : if (!MyLogicalRepWorker->stream_fileset)
1466 : {
1467 : MemoryContext oldctx;
1468 :
1469 28 : oldctx = MemoryContextSwitchTo(ApplyContext);
1470 :
1471 28 : MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
1472 28 : FileSetInit(MyLogicalRepWorker->stream_fileset);
1473 :
1474 28 : MemoryContextSwitchTo(oldctx);
1475 : }
1476 :
1477 : /* Open the spool file for this transaction. */
1478 726 : stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
1479 :
1480 : /* If this is not the first segment, open existing subxact file. */
1481 726 : if (!first_segment)
1482 662 : subxact_info_read(MyLogicalRepWorker->subid, xid);
1483 :
1484 726 : end_replication_step();
1485 726 : }
1486 :
1487 : /*
1488 : * Handle STREAM START message.
1489 : */
1490 : static void
1491 1710 : apply_handle_stream_start(StringInfo s)
1492 : {
1493 : bool first_segment;
1494 : ParallelApplyWorkerInfo *winfo;
1495 : TransApplyAction apply_action;
1496 :
1497 : /* Save the message before it is consumed. */
1498 1710 : StringInfoData original_msg = *s;
1499 :
1500 1710 : if (in_streamed_transaction)
1501 0 : ereport(ERROR,
1502 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1503 : errmsg_internal("duplicate STREAM START message")));
1504 :
1505 : /* There must not be an active streaming transaction. */
1506 : Assert(!TransactionIdIsValid(stream_xid));
1507 :
1508 : /* notify handle methods we're processing a remote transaction */
1509 1710 : in_streamed_transaction = true;
1510 :
1511 : /* extract XID of the top-level transaction */
1512 1710 : stream_xid = logicalrep_read_stream_start(s, &first_segment);
1513 :
1514 1710 : if (!TransactionIdIsValid(stream_xid))
1515 0 : ereport(ERROR,
1516 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1517 : errmsg_internal("invalid transaction ID in streamed replication transaction")));
1518 :
1519 1710 : set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
1520 :
1521 : /* Try to allocate a worker for the streaming transaction. */
1522 1710 : if (first_segment)
1523 164 : pa_allocate_worker(stream_xid);
1524 :
1525 1710 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1526 :
1527 1710 : switch (apply_action)
1528 : {
1529 686 : case TRANS_LEADER_SERIALIZE:
1530 :
1531 : /*
1532 : * Function stream_start_internal starts a transaction. This
1533 : * transaction will be committed on the stream stop unless it is a
1534 : * tablesync worker in which case it will be committed after
1535 : * processing all the messages. We need this transaction for
1536 : * handling the BufFile, used for serializing the streaming data
1537 : * and subxact info.
1538 : */
1539 686 : stream_start_internal(stream_xid, first_segment);
1540 686 : break;
1541 :
1542 500 : case TRANS_LEADER_SEND_TO_PARALLEL:
1543 : Assert(winfo);
1544 :
1545 : /*
1546 : * Once we start serializing the changes, the parallel apply
1547 : * worker will wait for the leader to release the stream lock
1548 : * until the end of the transaction. So, we don't need to release
1549 : * the lock or increment the stream count in that case.
1550 : */
1551 500 : if (pa_send_data(winfo, s->len, s->data))
1552 : {
1553 : /*
1554 : * Unlock the shared object lock so that the parallel apply
1555 : * worker can continue to receive changes.
1556 : */
1557 492 : if (!first_segment)
1558 446 : pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
1559 :
1560 : /*
1561 : * Increment the number of streaming blocks waiting to be
1562 : * processed by parallel apply worker.
1563 : */
1564 492 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
1565 :
1566 : /* Cache the parallel apply worker for this transaction. */
1567 492 : pa_set_stream_apply_worker(winfo);
1568 492 : break;
1569 : }
1570 :
1571 : /*
1572 : * Switch to serialize mode when we are not able to send the
1573 : * change to parallel apply worker.
1574 : */
1575 8 : pa_switch_to_partial_serialize(winfo, !first_segment);
1576 :
1577 : /* fall through */
1578 30 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1579 : Assert(winfo);
1580 :
1581 : /*
1582 : * Open the spool file unless it was already opened when switching
1583 : * to serialize mode. The transaction started in
1584 : * stream_start_internal will be committed on the stream stop.
1585 : */
1586 30 : if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
1587 22 : stream_start_internal(stream_xid, first_segment);
1588 :
1589 30 : stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg);
1590 :
1591 : /* Cache the parallel apply worker for this transaction. */
1592 30 : pa_set_stream_apply_worker(winfo);
1593 30 : break;
1594 :
1595 502 : case TRANS_PARALLEL_APPLY:
1596 502 : if (first_segment)
1597 : {
1598 : /* Hold the lock until the end of the transaction. */
1599 54 : pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1600 54 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED);
1601 :
1602 : /*
1603 : * Signal the leader apply worker, as it may be waiting for
1604 : * us.
1605 : */
1606 54 : logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
1607 : }
1608 :
1609 502 : parallel_stream_nchanges = 0;
1610 502 : break;
1611 :
1612 0 : default:
1613 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1614 : break;
1615 : }
1616 :
1617 1710 : pgstat_report_activity(STATE_RUNNING, NULL);
1618 1710 : }
1619 :
1620 : /*
1621 : * Update the information about subxacts and close the file.
1622 : *
1623 : * This function should be called when the stream_start_internal function has
1624 : * been called.
1625 : */
1626 : void
1627 726 : stream_stop_internal(TransactionId xid)
1628 : {
1629 : /*
1630 : * Serialize information about subxacts for the toplevel transaction, then
1631 : * close the stream messages spool file.
1632 : */
1633 726 : subxact_info_write(MyLogicalRepWorker->subid, xid);
1634 726 : stream_close_file();
1635 :
1636 : /* We must be in a valid transaction state */
1637 : Assert(IsTransactionState());
1638 :
1639 : /* Commit the per-stream transaction */
1640 726 : CommitTransactionCommand();
1641 :
1642 : /* Reset per-stream context */
1643 726 : MemoryContextReset(LogicalStreamingContext);
1644 726 : }
1645 :
1646 : /*
1647 : * Handle STREAM STOP message.
1648 : */
1649 : static void
1650 1708 : apply_handle_stream_stop(StringInfo s)
1651 : {
1652 : ParallelApplyWorkerInfo *winfo;
1653 : TransApplyAction apply_action;
1654 :
1655 1708 : if (!in_streamed_transaction)
1656 0 : ereport(ERROR,
1657 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1658 : errmsg_internal("STREAM STOP message without STREAM START")));
1659 :
1660 1708 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1661 :
1662 1708 : switch (apply_action)
1663 : {
1664 686 : case TRANS_LEADER_SERIALIZE:
1665 686 : stream_stop_internal(stream_xid);
1666 686 : break;
1667 :
1668 492 : case TRANS_LEADER_SEND_TO_PARALLEL:
1669 : Assert(winfo);
1670 :
1671 : /*
1672 : * Lock before sending the STREAM_STOP message so that the leader
1673 : * can hold the lock first and the parallel apply worker will wait
1674 : * for leader to release the lock. See Locking Considerations atop
1675 : * applyparallelworker.c.
1676 : */
1677 492 : pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
1678 :
1679 492 : if (pa_send_data(winfo, s->len, s->data))
1680 : {
1681 492 : pa_set_stream_apply_worker(NULL);
1682 492 : break;
1683 : }
1684 :
1685 : /*
1686 : * Switch to serialize mode when we are not able to send the
1687 : * change to parallel apply worker.
1688 : */
1689 0 : pa_switch_to_partial_serialize(winfo, true);
1690 :
1691 : /* fall through */
1692 30 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1693 30 : stream_write_change(LOGICAL_REP_MSG_STREAM_STOP, s);
1694 30 : stream_stop_internal(stream_xid);
1695 30 : pa_set_stream_apply_worker(NULL);
1696 30 : break;
1697 :
1698 500 : case TRANS_PARALLEL_APPLY:
1699 500 : elog(DEBUG1, "applied %u changes in the streaming chunk",
1700 : parallel_stream_nchanges);
1701 :
1702 : /*
1703 : * By the time parallel apply worker is processing the changes in
1704 : * the current streaming block, the leader apply worker may have
1705 : * sent multiple streaming blocks. This can lead to parallel apply
1706 : * worker start waiting even when there are more chunk of streams
1707 : * in the queue. So, try to lock only if there is no message left
1708 : * in the queue. See Locking Considerations atop
1709 : * applyparallelworker.c.
1710 : *
1711 : * Note that here we have a race condition where we can start
1712 : * waiting even when there are pending streaming chunks. This can
1713 : * happen if the leader sends another streaming block and acquires
1714 : * the stream lock again after the parallel apply worker checks
1715 : * that there is no pending streaming block and before it actually
1716 : * starts waiting on a lock. We can handle this case by not
1717 : * allowing the leader to increment the stream block count during
1718 : * the time parallel apply worker acquires the lock but it is not
1719 : * clear whether that is worth the complexity.
1720 : *
1721 : * Now, if this missed chunk contains rollback to savepoint, then
1722 : * there is a risk of deadlock which probably shouldn't happen
1723 : * after restart.
1724 : */
1725 500 : pa_decr_and_wait_stream_block();
1726 496 : break;
1727 :
1728 0 : default:
1729 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1730 : break;
1731 : }
1732 :
1733 1704 : in_streamed_transaction = false;
1734 1704 : stream_xid = InvalidTransactionId;
1735 :
1736 : /*
1737 : * The parallel apply worker could be in a transaction in which case we
1738 : * need to report the state as STATE_IDLEINTRANSACTION.
1739 : */
1740 1704 : if (IsTransactionOrTransactionBlock())
1741 496 : pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL);
1742 : else
1743 1208 : pgstat_report_activity(STATE_IDLE, NULL);
1744 :
1745 1704 : reset_apply_error_context_info();
1746 1704 : }
1747 :
1748 : /*
1749 : * Helper function to handle STREAM ABORT message when the transaction was
1750 : * serialized to file.
1751 : */
1752 : static void
1753 28 : stream_abort_internal(TransactionId xid, TransactionId subxid)
1754 : {
1755 : /*
1756 : * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1757 : * just delete the files with serialized info.
1758 : */
1759 28 : if (xid == subxid)
1760 2 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
1761 : else
1762 : {
1763 : /*
1764 : * OK, so it's a subxact. We need to read the subxact file for the
1765 : * toplevel transaction, determine the offset tracked for the subxact,
1766 : * and truncate the file with changes. We also remove the subxacts
1767 : * with higher offsets (or rather higher XIDs).
1768 : *
1769 : * We intentionally scan the array from the tail, because we're likely
1770 : * aborting a change for the most recent subtransactions.
1771 : *
1772 : * We can't use the binary search here as subxact XIDs won't
1773 : * necessarily arrive in sorted order, consider the case where we have
1774 : * released the savepoint for multiple subtransactions and then
1775 : * performed rollback to savepoint for one of the earlier
1776 : * sub-transaction.
1777 : */
1778 : int64 i;
1779 : int64 subidx;
1780 : BufFile *fd;
1781 26 : bool found = false;
1782 : char path[MAXPGPATH];
1783 :
1784 26 : subidx = -1;
1785 26 : begin_replication_step();
1786 26 : subxact_info_read(MyLogicalRepWorker->subid, xid);
1787 :
1788 30 : for (i = subxact_data.nsubxacts; i > 0; i--)
1789 : {
1790 22 : if (subxact_data.subxacts[i - 1].xid == subxid)
1791 : {
1792 18 : subidx = (i - 1);
1793 18 : found = true;
1794 18 : break;
1795 : }
1796 : }
1797 :
1798 : /*
1799 : * If it's an empty sub-transaction then we will not find the subxid
1800 : * here so just cleanup the subxact info and return.
1801 : */
1802 26 : if (!found)
1803 : {
1804 : /* Cleanup the subxact info */
1805 8 : cleanup_subxact_info();
1806 8 : end_replication_step();
1807 8 : CommitTransactionCommand();
1808 8 : return;
1809 : }
1810 :
1811 : /* open the changes file */
1812 18 : changes_filename(path, MyLogicalRepWorker->subid, xid);
1813 18 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path,
1814 : O_RDWR, false);
1815 :
1816 : /* OK, truncate the file at the right offset */
1817 18 : BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
1818 18 : subxact_data.subxacts[subidx].offset);
1819 18 : BufFileClose(fd);
1820 :
1821 : /* discard the subxacts added later */
1822 18 : subxact_data.nsubxacts = subidx;
1823 :
1824 : /* write the updated subxact list */
1825 18 : subxact_info_write(MyLogicalRepWorker->subid, xid);
1826 :
1827 18 : end_replication_step();
1828 18 : CommitTransactionCommand();
1829 : }
1830 : }
1831 :
1832 : /*
1833 : * Handle STREAM ABORT message.
1834 : */
1835 : static void
1836 76 : apply_handle_stream_abort(StringInfo s)
1837 : {
1838 : TransactionId xid;
1839 : TransactionId subxid;
1840 : LogicalRepStreamAbortData abort_data;
1841 : ParallelApplyWorkerInfo *winfo;
1842 : TransApplyAction apply_action;
1843 :
1844 : /* Save the message before it is consumed. */
1845 76 : StringInfoData original_msg = *s;
1846 : bool toplevel_xact;
1847 :
1848 76 : if (in_streamed_transaction)
1849 0 : ereport(ERROR,
1850 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1851 : errmsg_internal("STREAM ABORT message without STREAM STOP")));
1852 :
1853 : /* We receive abort information only when we can apply in parallel. */
1854 76 : logicalrep_read_stream_abort(s, &abort_data,
1855 76 : MyLogicalRepWorker->parallel_apply);
1856 :
1857 76 : xid = abort_data.xid;
1858 76 : subxid = abort_data.subxid;
1859 76 : toplevel_xact = (xid == subxid);
1860 :
1861 76 : set_apply_error_context_xact(subxid, abort_data.abort_lsn);
1862 :
1863 76 : apply_action = get_transaction_apply_action(xid, &winfo);
1864 :
1865 76 : switch (apply_action)
1866 : {
1867 28 : case TRANS_LEADER_APPLY:
1868 :
1869 : /*
1870 : * We are in the leader apply worker and the transaction has been
1871 : * serialized to file.
1872 : */
1873 28 : stream_abort_internal(xid, subxid);
1874 :
1875 28 : elog(DEBUG1, "finished processing the STREAM ABORT command");
1876 28 : break;
1877 :
1878 20 : case TRANS_LEADER_SEND_TO_PARALLEL:
1879 : Assert(winfo);
1880 :
1881 : /*
1882 : * For the case of aborting the subtransaction, we increment the
1883 : * number of streaming blocks and take the lock again before
1884 : * sending the STREAM_ABORT to ensure that the parallel apply
1885 : * worker will wait on the lock for the next set of changes after
1886 : * processing the STREAM_ABORT message if it is not already
1887 : * waiting for STREAM_STOP message.
1888 : *
1889 : * It is important to perform this locking before sending the
1890 : * STREAM_ABORT message so that the leader can hold the lock first
1891 : * and the parallel apply worker will wait for the leader to
1892 : * release the lock. This is the same as what we do in
1893 : * apply_handle_stream_stop. See Locking Considerations atop
1894 : * applyparallelworker.c.
1895 : */
1896 20 : if (!toplevel_xact)
1897 : {
1898 18 : pa_unlock_stream(xid, AccessExclusiveLock);
1899 18 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
1900 18 : pa_lock_stream(xid, AccessExclusiveLock);
1901 : }
1902 :
1903 20 : if (pa_send_data(winfo, s->len, s->data))
1904 : {
1905 : /*
1906 : * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
1907 : * wait here for the parallel apply worker to finish as that
1908 : * is not required to maintain the commit order and won't have
1909 : * the risk of failures due to transaction dependencies and
1910 : * deadlocks. However, it is possible that before the parallel
1911 : * worker finishes and we clear the worker info, the xid
1912 : * wraparound happens on the upstream and a new transaction
1913 : * with the same xid can appear and that can lead to duplicate
1914 : * entries in ParallelApplyTxnHash. Yet another problem could
1915 : * be that we may have serialized the changes in partial
1916 : * serialize mode and the file containing xact changes may
1917 : * already exist, and after xid wraparound trying to create
1918 : * the file for the same xid can lead to an error. To avoid
1919 : * these problems, we decide to wait for the aborts to finish.
1920 : *
1921 : * Note, it is okay to not update the flush location position
1922 : * for aborts as in worst case that means such a transaction
1923 : * won't be sent again after restart.
1924 : */
1925 20 : if (toplevel_xact)
1926 2 : pa_xact_finish(winfo, InvalidXLogRecPtr);
1927 :
1928 20 : break;
1929 : }
1930 :
1931 : /*
1932 : * Switch to serialize mode when we are not able to send the
1933 : * change to parallel apply worker.
1934 : */
1935 0 : pa_switch_to_partial_serialize(winfo, true);
1936 :
1937 : /* fall through */
1938 4 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1939 : Assert(winfo);
1940 :
1941 : /*
1942 : * Parallel apply worker might have applied some changes, so write
1943 : * the STREAM_ABORT message so that it can rollback the
1944 : * subtransaction if needed.
1945 : */
1946 4 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_ABORT,
1947 : &original_msg);
1948 :
1949 4 : if (toplevel_xact)
1950 : {
1951 2 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
1952 2 : pa_xact_finish(winfo, InvalidXLogRecPtr);
1953 : }
1954 4 : break;
1955 :
1956 24 : case TRANS_PARALLEL_APPLY:
1957 :
1958 : /*
1959 : * If the parallel apply worker is applying spooled messages then
1960 : * close the file before aborting.
1961 : */
1962 24 : if (toplevel_xact && stream_fd)
1963 2 : stream_close_file();
1964 :
1965 24 : pa_stream_abort(&abort_data);
1966 :
1967 : /*
1968 : * We need to wait after processing rollback to savepoint for the
1969 : * next set of changes.
1970 : *
1971 : * We have a race condition here due to which we can start waiting
1972 : * here when there are more chunk of streams in the queue. See
1973 : * apply_handle_stream_stop.
1974 : */
1975 24 : if (!toplevel_xact)
1976 20 : pa_decr_and_wait_stream_block();
1977 :
1978 24 : elog(DEBUG1, "finished processing the STREAM ABORT command");
1979 24 : break;
1980 :
1981 0 : default:
1982 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1983 : break;
1984 : }
1985 :
1986 76 : reset_apply_error_context_info();
1987 76 : }
1988 :
1989 : /*
1990 : * Ensure that the passed location is fileset's end.
1991 : */
1992 : static void
1993 8 : ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
1994 : off_t offset)
1995 : {
1996 : char path[MAXPGPATH];
1997 : BufFile *fd;
1998 : int last_fileno;
1999 : off_t last_offset;
2000 :
2001 : Assert(!IsTransactionState());
2002 :
2003 8 : begin_replication_step();
2004 :
2005 8 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2006 :
2007 8 : fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2008 :
2009 8 : BufFileSeek(fd, 0, 0, SEEK_END);
2010 8 : BufFileTell(fd, &last_fileno, &last_offset);
2011 :
2012 8 : BufFileClose(fd);
2013 :
2014 8 : end_replication_step();
2015 :
2016 8 : if (last_fileno != fileno || last_offset != offset)
2017 0 : elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2018 : path);
2019 8 : }
2020 :
2021 : /*
2022 : * Common spoolfile processing.
2023 : */
2024 : void
2025 62 : apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
2026 : XLogRecPtr lsn)
2027 : {
2028 : int nchanges;
2029 : char path[MAXPGPATH];
2030 62 : char *buffer = NULL;
2031 : MemoryContext oldcxt;
2032 : ResourceOwner oldowner;
2033 : int fileno;
2034 : off_t offset;
2035 :
2036 62 : if (!am_parallel_apply_worker())
2037 54 : maybe_start_skipping_changes(lsn);
2038 :
2039 : /* Make sure we have an open transaction */
2040 62 : begin_replication_step();
2041 :
2042 : /*
2043 : * Allocate file handle and memory required to process all the messages in
2044 : * TopTransactionContext to avoid them getting reset after each message is
2045 : * processed.
2046 : */
2047 62 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
2048 :
2049 : /* Open the spool file for the committed/prepared transaction */
2050 62 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2051 62 : elog(DEBUG1, "replaying changes from file \"%s\"", path);
2052 :
2053 : /*
2054 : * Make sure the file is owned by the toplevel transaction so that the
2055 : * file will not be accidentally closed when aborting a subtransaction.
2056 : */
2057 62 : oldowner = CurrentResourceOwner;
2058 62 : CurrentResourceOwner = TopTransactionResourceOwner;
2059 :
2060 62 : stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2061 :
2062 62 : CurrentResourceOwner = oldowner;
2063 :
2064 62 : buffer = palloc(BLCKSZ);
2065 :
2066 62 : MemoryContextSwitchTo(oldcxt);
2067 :
2068 62 : remote_final_lsn = lsn;
2069 :
2070 : /*
2071 : * Make sure the handle apply_dispatch methods are aware we're in a remote
2072 : * transaction.
2073 : */
2074 62 : in_remote_transaction = true;
2075 62 : pgstat_report_activity(STATE_RUNNING, NULL);
2076 :
2077 62 : end_replication_step();
2078 :
2079 : /*
2080 : * Read the entries one by one and pass them through the same logic as in
2081 : * apply_dispatch.
2082 : */
2083 62 : nchanges = 0;
2084 : while (true)
2085 176938 : {
2086 : StringInfoData s2;
2087 : size_t nbytes;
2088 : int len;
2089 :
2090 177000 : CHECK_FOR_INTERRUPTS();
2091 :
2092 : /* read length of the on-disk record */
2093 177000 : nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2094 :
2095 : /* have we reached end of the file? */
2096 177000 : if (nbytes == 0)
2097 52 : break;
2098 :
2099 : /* do we have a correct length? */
2100 176948 : if (len <= 0)
2101 0 : elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2102 : len, path);
2103 :
2104 : /* make sure we have sufficiently large buffer */
2105 176948 : buffer = repalloc(buffer, len);
2106 :
2107 : /* and finally read the data into the buffer */
2108 176948 : BufFileReadExact(stream_fd, buffer, len);
2109 :
2110 176948 : BufFileTell(stream_fd, &fileno, &offset);
2111 :
2112 : /* init a stringinfo using the buffer and call apply_dispatch */
2113 176948 : initReadOnlyStringInfo(&s2, buffer, len);
2114 :
2115 : /* Ensure we are reading the data into our memory context. */
2116 176948 : oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
2117 :
2118 176948 : apply_dispatch(&s2);
2119 :
2120 176946 : MemoryContextReset(ApplyMessageContext);
2121 :
2122 176946 : MemoryContextSwitchTo(oldcxt);
2123 :
2124 176946 : nchanges++;
2125 :
2126 : /*
2127 : * It is possible the file has been closed because we have processed
2128 : * the transaction end message like stream_commit in which case that
2129 : * must be the last message.
2130 : */
2131 176946 : if (!stream_fd)
2132 : {
2133 8 : ensure_last_message(stream_fileset, xid, fileno, offset);
2134 8 : break;
2135 : }
2136 :
2137 176938 : if (nchanges % 1000 == 0)
2138 166 : elog(DEBUG1, "replayed %d changes from file \"%s\"",
2139 : nchanges, path);
2140 : }
2141 :
2142 60 : if (stream_fd)
2143 52 : stream_close_file();
2144 :
2145 60 : elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2146 : nchanges, path);
2147 :
2148 60 : return;
2149 : }
2150 :
2151 : /*
2152 : * Handle STREAM COMMIT message.
2153 : */
2154 : static void
2155 122 : apply_handle_stream_commit(StringInfo s)
2156 : {
2157 : TransactionId xid;
2158 : LogicalRepCommitData commit_data;
2159 : ParallelApplyWorkerInfo *winfo;
2160 : TransApplyAction apply_action;
2161 :
2162 : /* Save the message before it is consumed. */
2163 122 : StringInfoData original_msg = *s;
2164 :
2165 122 : if (in_streamed_transaction)
2166 0 : ereport(ERROR,
2167 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2168 : errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2169 :
2170 122 : xid = logicalrep_read_stream_commit(s, &commit_data);
2171 122 : set_apply_error_context_xact(xid, commit_data.commit_lsn);
2172 :
2173 122 : apply_action = get_transaction_apply_action(xid, &winfo);
2174 :
2175 122 : switch (apply_action)
2176 : {
2177 44 : case TRANS_LEADER_APPLY:
2178 :
2179 : /*
2180 : * The transaction has been serialized to file, so replay all the
2181 : * spooled operations.
2182 : */
2183 44 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid,
2184 : commit_data.commit_lsn);
2185 :
2186 42 : apply_handle_commit_internal(&commit_data);
2187 :
2188 : /* Unlink the files with serialized changes and subxact info. */
2189 42 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
2190 :
2191 42 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2192 42 : break;
2193 :
2194 36 : case TRANS_LEADER_SEND_TO_PARALLEL:
2195 : Assert(winfo);
2196 :
2197 36 : if (pa_send_data(winfo, s->len, s->data))
2198 : {
2199 : /* Finish processing the streaming transaction. */
2200 36 : pa_xact_finish(winfo, commit_data.end_lsn);
2201 34 : break;
2202 : }
2203 :
2204 : /*
2205 : * Switch to serialize mode when we are not able to send the
2206 : * change to parallel apply worker.
2207 : */
2208 0 : pa_switch_to_partial_serialize(winfo, true);
2209 :
2210 : /* fall through */
2211 4 : case TRANS_LEADER_PARTIAL_SERIALIZE:
2212 : Assert(winfo);
2213 :
2214 4 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_COMMIT,
2215 : &original_msg);
2216 :
2217 4 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
2218 :
2219 : /* Finish processing the streaming transaction. */
2220 4 : pa_xact_finish(winfo, commit_data.end_lsn);
2221 4 : break;
2222 :
2223 38 : case TRANS_PARALLEL_APPLY:
2224 :
2225 : /*
2226 : * If the parallel apply worker is applying spooled messages then
2227 : * close the file before committing.
2228 : */
2229 38 : if (stream_fd)
2230 4 : stream_close_file();
2231 :
2232 38 : apply_handle_commit_internal(&commit_data);
2233 :
2234 38 : MyParallelShared->last_commit_end = XactLastCommitEnd;
2235 :
2236 : /*
2237 : * It is important to set the transaction state as finished before
2238 : * releasing the lock. See pa_wait_for_xact_finish.
2239 : */
2240 38 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
2241 38 : pa_unlock_transaction(xid, AccessExclusiveLock);
2242 :
2243 38 : pa_reset_subtrans();
2244 :
2245 38 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2246 38 : break;
2247 :
2248 0 : default:
2249 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2250 : break;
2251 : }
2252 :
2253 : /* Process any tables that are being synchronized in parallel. */
2254 118 : process_syncing_tables(commit_data.end_lsn);
2255 :
2256 118 : pgstat_report_activity(STATE_IDLE, NULL);
2257 :
2258 118 : reset_apply_error_context_info();
2259 118 : }
2260 :
2261 : /*
2262 : * Helper function for apply_handle_commit and apply_handle_stream_commit.
2263 : */
2264 : static void
2265 928 : apply_handle_commit_internal(LogicalRepCommitData *commit_data)
2266 : {
2267 928 : if (is_skipping_changes())
2268 : {
2269 4 : stop_skipping_changes();
2270 :
2271 : /*
2272 : * Start a new transaction to clear the subskiplsn, if not started
2273 : * yet.
2274 : */
2275 4 : if (!IsTransactionState())
2276 2 : StartTransactionCommand();
2277 : }
2278 :
2279 928 : if (IsTransactionState())
2280 : {
2281 : /*
2282 : * The transaction is either non-empty or skipped, so we clear the
2283 : * subskiplsn.
2284 : */
2285 928 : clear_subscription_skip_lsn(commit_data->commit_lsn);
2286 :
2287 : /*
2288 : * Update origin state so we can restart streaming from correct
2289 : * position in case of crash.
2290 : */
2291 928 : replorigin_session_origin_lsn = commit_data->end_lsn;
2292 928 : replorigin_session_origin_timestamp = commit_data->committime;
2293 :
2294 928 : CommitTransactionCommand();
2295 :
2296 928 : if (IsTransactionBlock())
2297 : {
2298 8 : EndTransactionBlock(false);
2299 8 : CommitTransactionCommand();
2300 : }
2301 :
2302 928 : pgstat_report_stat(false);
2303 :
2304 928 : store_flush_position(commit_data->end_lsn, XactLastCommitEnd);
2305 : }
2306 : else
2307 : {
2308 : /* Process any invalidation messages that might have accumulated. */
2309 0 : AcceptInvalidationMessages();
2310 0 : maybe_reread_subscription();
2311 : }
2312 :
2313 928 : in_remote_transaction = false;
2314 928 : }
2315 :
2316 : /*
2317 : * Handle RELATION message.
2318 : *
2319 : * Note we don't do validation against local schema here. The validation
2320 : * against local schema is postponed until first change for given relation
2321 : * comes as we only care about it when applying changes for it anyway and we
2322 : * do less locking this way.
2323 : */
2324 : static void
2325 864 : apply_handle_relation(StringInfo s)
2326 : {
2327 : LogicalRepRelation *rel;
2328 :
2329 864 : if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
2330 70 : return;
2331 :
2332 794 : rel = logicalrep_read_rel(s);
2333 794 : logicalrep_relmap_update(rel);
2334 :
2335 : /* Also reset all entries in the partition map that refer to remoterel. */
2336 794 : logicalrep_partmap_reset_relmap(rel);
2337 : }
2338 :
2339 : /*
2340 : * Handle TYPE message.
2341 : *
2342 : * This implementation pays no attention to TYPE messages; we expect the user
2343 : * to have set things up so that the incoming data is acceptable to the input
2344 : * functions for the locally subscribed tables. Hence, we just read and
2345 : * discard the message.
2346 : */
2347 : static void
2348 36 : apply_handle_type(StringInfo s)
2349 : {
2350 : LogicalRepTyp typ;
2351 :
2352 36 : if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s))
2353 0 : return;
2354 :
2355 36 : logicalrep_read_typ(s, &typ);
2356 : }
2357 :
2358 : /*
2359 : * Check that we (the subscription owner) have sufficient privileges on the
2360 : * target relation to perform the given operation.
2361 : */
2362 : static void
2363 441504 : TargetPrivilegesCheck(Relation rel, AclMode mode)
2364 : {
2365 : Oid relid;
2366 : AclResult aclresult;
2367 :
2368 441504 : relid = RelationGetRelid(rel);
2369 441504 : aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
2370 441504 : if (aclresult != ACLCHECK_OK)
2371 14 : aclcheck_error(aclresult,
2372 14 : get_relkind_objtype(rel->rd_rel->relkind),
2373 14 : get_rel_name(relid));
2374 :
2375 : /*
2376 : * We lack the infrastructure to honor RLS policies. It might be possible
2377 : * to add such infrastructure here, but tablesync workers lack it, too, so
2378 : * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2379 : * but it seems dangerous to replicate a TRUNCATE and then refuse to
2380 : * replicate subsequent INSERTs, so we forbid all commands the same.
2381 : */
2382 441490 : if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2383 6 : ereport(ERROR,
2384 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2385 : errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2386 : GetUserNameFromId(GetUserId(), true),
2387 : RelationGetRelationName(rel))));
2388 441484 : }
2389 :
2390 : /*
2391 : * Handle INSERT message.
2392 : */
2393 :
2394 : static void
2395 372590 : apply_handle_insert(StringInfo s)
2396 : {
2397 : LogicalRepRelMapEntry *rel;
2398 : LogicalRepTupleData newtup;
2399 : LogicalRepRelId relid;
2400 : UserContext ucxt;
2401 : ApplyExecutionData *edata;
2402 : EState *estate;
2403 : TupleTableSlot *remoteslot;
2404 : MemoryContext oldctx;
2405 : bool run_as_owner;
2406 :
2407 : /*
2408 : * Quick return if we are skipping data modification changes or handling
2409 : * streamed transactions.
2410 : */
2411 725178 : if (is_skipping_changes() ||
2412 352588 : handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
2413 220112 : return;
2414 :
2415 152576 : begin_replication_step();
2416 :
2417 152572 : relid = logicalrep_read_insert(s, &newtup);
2418 152572 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2419 152560 : if (!should_apply_changes_for_rel(rel))
2420 : {
2421 : /*
2422 : * The relation can't become interesting in the middle of the
2423 : * transaction so it's safe to unlock it.
2424 : */
2425 98 : logicalrep_rel_close(rel, RowExclusiveLock);
2426 98 : end_replication_step();
2427 98 : return;
2428 : }
2429 :
2430 : /*
2431 : * Make sure that any user-supplied code runs as the table owner, unless
2432 : * the user has opted out of that behavior.
2433 : */
2434 152462 : run_as_owner = MySubscription->runasowner;
2435 152462 : if (!run_as_owner)
2436 152446 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2437 :
2438 : /* Set relation for error callback */
2439 152462 : apply_error_callback_arg.rel = rel;
2440 :
2441 : /* Initialize the executor state. */
2442 152462 : edata = create_edata_for_relation(rel);
2443 152462 : estate = edata->estate;
2444 152462 : remoteslot = ExecInitExtraTupleSlot(estate,
2445 152462 : RelationGetDescr(rel->localrel),
2446 : &TTSOpsVirtual);
2447 :
2448 : /* Process and store remote tuple in the slot */
2449 152462 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2450 152462 : slot_store_data(remoteslot, rel, &newtup);
2451 152462 : slot_fill_defaults(rel, estate, remoteslot);
2452 152462 : MemoryContextSwitchTo(oldctx);
2453 :
2454 : /* For a partitioned table, insert the tuple into a partition. */
2455 152462 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2456 90 : apply_handle_tuple_routing(edata,
2457 : remoteslot, NULL, CMD_INSERT);
2458 : else
2459 : {
2460 152372 : ResultRelInfo *relinfo = edata->targetRelInfo;
2461 :
2462 152372 : ExecOpenIndices(relinfo, false);
2463 152372 : apply_handle_insert_internal(edata, relinfo, remoteslot);
2464 152344 : ExecCloseIndices(relinfo);
2465 : }
2466 :
2467 152432 : finish_edata(edata);
2468 :
2469 : /* Reset relation for error callback */
2470 152432 : apply_error_callback_arg.rel = NULL;
2471 :
2472 152432 : if (!run_as_owner)
2473 152422 : RestoreUserContext(&ucxt);
2474 :
2475 152432 : logicalrep_rel_close(rel, NoLock);
2476 :
2477 152432 : end_replication_step();
2478 : }
2479 :
2480 : /*
2481 : * Workhorse for apply_handle_insert()
2482 : * relinfo is for the relation we're actually inserting into
2483 : * (could be a child partition of edata->targetRelInfo)
2484 : */
2485 : static void
2486 152464 : apply_handle_insert_internal(ApplyExecutionData *edata,
2487 : ResultRelInfo *relinfo,
2488 : TupleTableSlot *remoteslot)
2489 : {
2490 152464 : EState *estate = edata->estate;
2491 :
2492 : /* Caller should have opened indexes already. */
2493 : Assert(relinfo->ri_IndexRelationDescs != NULL ||
2494 : !relinfo->ri_RelationDesc->rd_rel->relhasindex ||
2495 : RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
2496 :
2497 : /* Caller will not have done this bit. */
2498 : Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
2499 152464 : InitConflictIndexes(relinfo);
2500 :
2501 : /* Do the insert. */
2502 152464 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
2503 152452 : ExecSimpleRelationInsert(relinfo, estate, remoteslot);
2504 152434 : }
2505 :
2506 : /*
2507 : * Check if the logical replication relation is updatable and throw
2508 : * appropriate error if it isn't.
2509 : */
2510 : static void
2511 144566 : check_relation_updatable(LogicalRepRelMapEntry *rel)
2512 : {
2513 : /*
2514 : * For partitioned tables, we only need to care if the target partition is
2515 : * updatable (aka has PK or RI defined for it).
2516 : */
2517 144566 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2518 60 : return;
2519 :
2520 : /* Updatable, no error. */
2521 144506 : if (rel->updatable)
2522 144506 : return;
2523 :
2524 : /*
2525 : * We are in error mode so it's fine this is somewhat slow. It's better to
2526 : * give user correct error.
2527 : */
2528 0 : if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
2529 : {
2530 0 : ereport(ERROR,
2531 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2532 : errmsg("publisher did not send replica identity column "
2533 : "expected by the logical replication target relation \"%s.%s\"",
2534 : rel->remoterel.nspname, rel->remoterel.relname)));
2535 : }
2536 :
2537 0 : ereport(ERROR,
2538 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2539 : errmsg("logical replication target relation \"%s.%s\" has "
2540 : "neither REPLICA IDENTITY index nor PRIMARY "
2541 : "KEY and published relation does not have "
2542 : "REPLICA IDENTITY FULL",
2543 : rel->remoterel.nspname, rel->remoterel.relname)));
2544 : }
2545 :
2546 : /*
2547 : * Handle UPDATE message.
2548 : *
2549 : * TODO: FDW support
2550 : */
2551 : static void
2552 132318 : apply_handle_update(StringInfo s)
2553 : {
2554 : LogicalRepRelMapEntry *rel;
2555 : LogicalRepRelId relid;
2556 : UserContext ucxt;
2557 : ApplyExecutionData *edata;
2558 : EState *estate;
2559 : LogicalRepTupleData oldtup;
2560 : LogicalRepTupleData newtup;
2561 : bool has_oldtup;
2562 : TupleTableSlot *remoteslot;
2563 : RTEPermissionInfo *target_perminfo;
2564 : MemoryContext oldctx;
2565 : bool run_as_owner;
2566 :
2567 : /*
2568 : * Quick return if we are skipping data modification changes or handling
2569 : * streamed transactions.
2570 : */
2571 264630 : if (is_skipping_changes() ||
2572 132312 : handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
2573 68446 : return;
2574 :
2575 63872 : begin_replication_step();
2576 :
2577 63870 : relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
2578 : &newtup);
2579 63870 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2580 63870 : if (!should_apply_changes_for_rel(rel))
2581 : {
2582 : /*
2583 : * The relation can't become interesting in the middle of the
2584 : * transaction so it's safe to unlock it.
2585 : */
2586 0 : logicalrep_rel_close(rel, RowExclusiveLock);
2587 0 : end_replication_step();
2588 0 : return;
2589 : }
2590 :
2591 : /* Set relation for error callback */
2592 63870 : apply_error_callback_arg.rel = rel;
2593 :
2594 : /* Check if we can do the update. */
2595 63870 : check_relation_updatable(rel);
2596 :
2597 : /*
2598 : * Make sure that any user-supplied code runs as the table owner, unless
2599 : * the user has opted out of that behavior.
2600 : */
2601 63870 : run_as_owner = MySubscription->runasowner;
2602 63870 : if (!run_as_owner)
2603 63864 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2604 :
2605 : /* Initialize the executor state. */
2606 63868 : edata = create_edata_for_relation(rel);
2607 63868 : estate = edata->estate;
2608 63868 : remoteslot = ExecInitExtraTupleSlot(estate,
2609 63868 : RelationGetDescr(rel->localrel),
2610 : &TTSOpsVirtual);
2611 :
2612 : /*
2613 : * Populate updatedCols so that per-column triggers can fire, and so
2614 : * executor can correctly pass down indexUnchanged hint. This could
2615 : * include more columns than were actually changed on the publisher
2616 : * because the logical replication protocol doesn't contain that
2617 : * information. But it would for example exclude columns that only exist
2618 : * on the subscriber, since we are not touching those.
2619 : */
2620 63868 : target_perminfo = list_nth(estate->es_rteperminfos, 0);
2621 318608 : for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2622 : {
2623 254740 : Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i);
2624 254740 : int remoteattnum = rel->attrmap->attnums[i];
2625 :
2626 254740 : if (!att->attisdropped && remoteattnum >= 0)
2627 : {
2628 : Assert(remoteattnum < newtup.ncols);
2629 137706 : if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
2630 137700 : target_perminfo->updatedCols =
2631 137700 : bms_add_member(target_perminfo->updatedCols,
2632 : i + 1 - FirstLowInvalidHeapAttributeNumber);
2633 : }
2634 : }
2635 :
2636 : /* Build the search tuple. */
2637 63868 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2638 63868 : slot_store_data(remoteslot, rel,
2639 63868 : has_oldtup ? &oldtup : &newtup);
2640 63868 : MemoryContextSwitchTo(oldctx);
2641 :
2642 : /* For a partitioned table, apply update to correct partition. */
2643 63868 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2644 26 : apply_handle_tuple_routing(edata,
2645 : remoteslot, &newtup, CMD_UPDATE);
2646 : else
2647 63842 : apply_handle_update_internal(edata, edata->targetRelInfo,
2648 : remoteslot, &newtup, rel->localindexoid);
2649 :
2650 63856 : finish_edata(edata);
2651 :
2652 : /* Reset relation for error callback */
2653 63856 : apply_error_callback_arg.rel = NULL;
2654 :
2655 63856 : if (!run_as_owner)
2656 63852 : RestoreUserContext(&ucxt);
2657 :
2658 63856 : logicalrep_rel_close(rel, NoLock);
2659 :
2660 63856 : end_replication_step();
2661 : }
2662 :
2663 : /*
2664 : * Workhorse for apply_handle_update()
2665 : * relinfo is for the relation we're actually updating in
2666 : * (could be a child partition of edata->targetRelInfo)
2667 : */
2668 : static void
2669 63842 : apply_handle_update_internal(ApplyExecutionData *edata,
2670 : ResultRelInfo *relinfo,
2671 : TupleTableSlot *remoteslot,
2672 : LogicalRepTupleData *newtup,
2673 : Oid localindexoid)
2674 : {
2675 63842 : EState *estate = edata->estate;
2676 63842 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2677 63842 : Relation localrel = relinfo->ri_RelationDesc;
2678 : EPQState epqstate;
2679 63842 : TupleTableSlot *localslot = NULL;
2680 63842 : ConflictTupleInfo conflicttuple = {0};
2681 : bool found;
2682 : MemoryContext oldctx;
2683 :
2684 63842 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2685 63842 : ExecOpenIndices(relinfo, false);
2686 :
2687 63842 : found = FindReplTupleInLocalRel(edata, localrel,
2688 : &relmapentry->remoterel,
2689 : localindexoid,
2690 : remoteslot, &localslot);
2691 :
2692 : /*
2693 : * Tuple found.
2694 : *
2695 : * Note this will fail if there are other conflicting unique indexes.
2696 : */
2697 63834 : if (found)
2698 : {
2699 : /*
2700 : * Report the conflict if the tuple was modified by a different
2701 : * origin.
2702 : */
2703 63826 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
2704 4 : &conflicttuple.origin, &conflicttuple.ts) &&
2705 4 : conflicttuple.origin != replorigin_session_origin)
2706 : {
2707 : TupleTableSlot *newslot;
2708 :
2709 : /* Store the new tuple for conflict reporting */
2710 4 : newslot = table_slot_create(localrel, &estate->es_tupleTable);
2711 4 : slot_store_data(newslot, relmapentry, newtup);
2712 :
2713 4 : conflicttuple.slot = localslot;
2714 :
2715 4 : ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
2716 : remoteslot, newslot,
2717 4 : list_make1(&conflicttuple));
2718 : }
2719 :
2720 : /* Process and store remote tuple in the slot */
2721 63826 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2722 63826 : slot_modify_data(remoteslot, localslot, relmapentry, newtup);
2723 63826 : MemoryContextSwitchTo(oldctx);
2724 :
2725 63826 : EvalPlanQualSetSlot(&epqstate, remoteslot);
2726 :
2727 63826 : InitConflictIndexes(relinfo);
2728 :
2729 : /* Do the actual update. */
2730 63826 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
2731 63826 : ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2732 : remoteslot);
2733 : }
2734 : else
2735 : {
2736 8 : TupleTableSlot *newslot = localslot;
2737 :
2738 : /* Store the new tuple for conflict reporting */
2739 8 : slot_store_data(newslot, relmapentry, newtup);
2740 :
2741 : /*
2742 : * The tuple to be updated could not be found. Do nothing except for
2743 : * emitting a log message.
2744 : */
2745 8 : ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
2746 8 : remoteslot, newslot, list_make1(&conflicttuple));
2747 : }
2748 :
2749 : /* Cleanup. */
2750 63830 : ExecCloseIndices(relinfo);
2751 63830 : EvalPlanQualEnd(&epqstate);
2752 63830 : }
2753 :
2754 : /*
2755 : * Handle DELETE message.
2756 : *
2757 : * TODO: FDW support
2758 : */
2759 : static void
2760 163866 : apply_handle_delete(StringInfo s)
2761 : {
2762 : LogicalRepRelMapEntry *rel;
2763 : LogicalRepTupleData oldtup;
2764 : LogicalRepRelId relid;
2765 : UserContext ucxt;
2766 : ApplyExecutionData *edata;
2767 : EState *estate;
2768 : TupleTableSlot *remoteslot;
2769 : MemoryContext oldctx;
2770 : bool run_as_owner;
2771 :
2772 : /*
2773 : * Quick return if we are skipping data modification changes or handling
2774 : * streamed transactions.
2775 : */
2776 327732 : if (is_skipping_changes() ||
2777 163866 : handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
2778 83230 : return;
2779 :
2780 80636 : begin_replication_step();
2781 :
2782 80636 : relid = logicalrep_read_delete(s, &oldtup);
2783 80636 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2784 80636 : if (!should_apply_changes_for_rel(rel))
2785 : {
2786 : /*
2787 : * The relation can't become interesting in the middle of the
2788 : * transaction so it's safe to unlock it.
2789 : */
2790 0 : logicalrep_rel_close(rel, RowExclusiveLock);
2791 0 : end_replication_step();
2792 0 : return;
2793 : }
2794 :
2795 : /* Set relation for error callback */
2796 80636 : apply_error_callback_arg.rel = rel;
2797 :
2798 : /* Check if we can do the delete. */
2799 80636 : check_relation_updatable(rel);
2800 :
2801 : /*
2802 : * Make sure that any user-supplied code runs as the table owner, unless
2803 : * the user has opted out of that behavior.
2804 : */
2805 80636 : run_as_owner = MySubscription->runasowner;
2806 80636 : if (!run_as_owner)
2807 80632 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2808 :
2809 : /* Initialize the executor state. */
2810 80636 : edata = create_edata_for_relation(rel);
2811 80636 : estate = edata->estate;
2812 80636 : remoteslot = ExecInitExtraTupleSlot(estate,
2813 80636 : RelationGetDescr(rel->localrel),
2814 : &TTSOpsVirtual);
2815 :
2816 : /* Build the search tuple. */
2817 80636 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2818 80636 : slot_store_data(remoteslot, rel, &oldtup);
2819 80636 : MemoryContextSwitchTo(oldctx);
2820 :
2821 : /* For a partitioned table, apply delete to correct partition. */
2822 80636 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2823 34 : apply_handle_tuple_routing(edata,
2824 : remoteslot, NULL, CMD_DELETE);
2825 : else
2826 : {
2827 80602 : ResultRelInfo *relinfo = edata->targetRelInfo;
2828 :
2829 80602 : ExecOpenIndices(relinfo, false);
2830 80602 : apply_handle_delete_internal(edata, relinfo,
2831 : remoteslot, rel->localindexoid);
2832 80602 : ExecCloseIndices(relinfo);
2833 : }
2834 :
2835 80636 : finish_edata(edata);
2836 :
2837 : /* Reset relation for error callback */
2838 80636 : apply_error_callback_arg.rel = NULL;
2839 :
2840 80636 : if (!run_as_owner)
2841 80632 : RestoreUserContext(&ucxt);
2842 :
2843 80636 : logicalrep_rel_close(rel, NoLock);
2844 :
2845 80636 : end_replication_step();
2846 : }
2847 :
2848 : /*
2849 : * Workhorse for apply_handle_delete()
2850 : * relinfo is for the relation we're actually deleting from
2851 : * (could be a child partition of edata->targetRelInfo)
2852 : */
2853 : static void
2854 80636 : apply_handle_delete_internal(ApplyExecutionData *edata,
2855 : ResultRelInfo *relinfo,
2856 : TupleTableSlot *remoteslot,
2857 : Oid localindexoid)
2858 : {
2859 80636 : EState *estate = edata->estate;
2860 80636 : Relation localrel = relinfo->ri_RelationDesc;
2861 80636 : LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
2862 : EPQState epqstate;
2863 : TupleTableSlot *localslot;
2864 80636 : ConflictTupleInfo conflicttuple = {0};
2865 : bool found;
2866 :
2867 80636 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2868 :
2869 : /* Caller should have opened indexes already. */
2870 : Assert(relinfo->ri_IndexRelationDescs != NULL ||
2871 : !localrel->rd_rel->relhasindex ||
2872 : RelationGetIndexList(localrel) == NIL);
2873 :
2874 80636 : found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
2875 : remoteslot, &localslot);
2876 :
2877 : /* If found delete it. */
2878 80636 : if (found)
2879 : {
2880 : /*
2881 : * Report the conflict if the tuple was modified by a different
2882 : * origin.
2883 : */
2884 80618 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
2885 6 : &conflicttuple.origin, &conflicttuple.ts) &&
2886 6 : conflicttuple.origin != replorigin_session_origin)
2887 : {
2888 4 : conflicttuple.slot = localslot;
2889 4 : ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
2890 : remoteslot, NULL,
2891 4 : list_make1(&conflicttuple));
2892 : }
2893 :
2894 80618 : EvalPlanQualSetSlot(&epqstate, localslot);
2895 :
2896 : /* Do the actual delete. */
2897 80618 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_DELETE);
2898 80618 : ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
2899 : }
2900 : else
2901 : {
2902 : /*
2903 : * The tuple to be deleted could not be found. Do nothing except for
2904 : * emitting a log message.
2905 : */
2906 18 : ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
2907 18 : remoteslot, NULL, list_make1(&conflicttuple));
2908 : }
2909 :
2910 : /* Cleanup. */
2911 80636 : EvalPlanQualEnd(&epqstate);
2912 80636 : }
2913 :
2914 : /*
2915 : * Try to find a tuple received from the publication side (in 'remoteslot') in
2916 : * the corresponding local relation using either replica identity index,
2917 : * primary key, index or if needed, sequential scan.
2918 : *
2919 : * Local tuple, if found, is returned in '*localslot'.
2920 : */
2921 : static bool
2922 144504 : FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
2923 : LogicalRepRelation *remoterel,
2924 : Oid localidxoid,
2925 : TupleTableSlot *remoteslot,
2926 : TupleTableSlot **localslot)
2927 : {
2928 144504 : EState *estate = edata->estate;
2929 : bool found;
2930 :
2931 : /*
2932 : * Regardless of the top-level operation, we're performing a read here, so
2933 : * check for SELECT privileges.
2934 : */
2935 144504 : TargetPrivilegesCheck(localrel, ACL_SELECT);
2936 :
2937 144496 : *localslot = table_slot_create(localrel, &estate->es_tupleTable);
2938 :
2939 : Assert(OidIsValid(localidxoid) ||
2940 : (remoterel->replident == REPLICA_IDENTITY_FULL));
2941 :
2942 144496 : if (OidIsValid(localidxoid))
2943 : {
2944 : #ifdef USE_ASSERT_CHECKING
2945 : Relation idxrel = index_open(localidxoid, AccessShareLock);
2946 :
2947 : /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
2948 : Assert(GetRelationIdentityOrPK(localrel) == localidxoid ||
2949 : (remoterel->replident == REPLICA_IDENTITY_FULL &&
2950 : IsIndexUsableForReplicaIdentityFull(idxrel,
2951 : edata->targetRel->attrmap)));
2952 : index_close(idxrel, AccessShareLock);
2953 : #endif
2954 :
2955 144198 : found = RelationFindReplTupleByIndex(localrel, localidxoid,
2956 : LockTupleExclusive,
2957 : remoteslot, *localslot);
2958 : }
2959 : else
2960 298 : found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
2961 : remoteslot, *localslot);
2962 :
2963 144496 : return found;
2964 : }
2965 :
2966 : /*
2967 : * This handles insert, update, delete on a partitioned table.
2968 : */
2969 : static void
2970 150 : apply_handle_tuple_routing(ApplyExecutionData *edata,
2971 : TupleTableSlot *remoteslot,
2972 : LogicalRepTupleData *newtup,
2973 : CmdType operation)
2974 : {
2975 150 : EState *estate = edata->estate;
2976 150 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2977 150 : ResultRelInfo *relinfo = edata->targetRelInfo;
2978 150 : Relation parentrel = relinfo->ri_RelationDesc;
2979 : ModifyTableState *mtstate;
2980 : PartitionTupleRouting *proute;
2981 : ResultRelInfo *partrelinfo;
2982 : Relation partrel;
2983 : TupleTableSlot *remoteslot_part;
2984 : TupleConversionMap *map;
2985 : MemoryContext oldctx;
2986 150 : LogicalRepRelMapEntry *part_entry = NULL;
2987 150 : AttrMap *attrmap = NULL;
2988 :
2989 : /* ModifyTableState is needed for ExecFindPartition(). */
2990 150 : edata->mtstate = mtstate = makeNode(ModifyTableState);
2991 150 : mtstate->ps.plan = NULL;
2992 150 : mtstate->ps.state = estate;
2993 150 : mtstate->operation = operation;
2994 150 : mtstate->resultRelInfo = relinfo;
2995 :
2996 : /* ... as is PartitionTupleRouting. */
2997 150 : edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
2998 :
2999 : /*
3000 : * Find the partition to which the "search tuple" belongs.
3001 : */
3002 : Assert(remoteslot != NULL);
3003 150 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3004 150 : partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
3005 : remoteslot, estate);
3006 : Assert(partrelinfo != NULL);
3007 150 : partrel = partrelinfo->ri_RelationDesc;
3008 :
3009 : /*
3010 : * Check for supported relkind. We need this since partitions might be of
3011 : * unsupported relkinds; and the set of partitions can change, so checking
3012 : * at CREATE/ALTER SUBSCRIPTION would be insufficient.
3013 : */
3014 150 : CheckSubscriptionRelkind(partrel->rd_rel->relkind,
3015 150 : get_namespace_name(RelationGetNamespace(partrel)),
3016 150 : RelationGetRelationName(partrel));
3017 :
3018 : /*
3019 : * To perform any of the operations below, the tuple must match the
3020 : * partition's rowtype. Convert if needed or just copy, using a dedicated
3021 : * slot to store the tuple in any case.
3022 : */
3023 150 : remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
3024 150 : if (remoteslot_part == NULL)
3025 84 : remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
3026 150 : map = ExecGetRootToChildMap(partrelinfo, estate);
3027 150 : if (map != NULL)
3028 : {
3029 66 : attrmap = map->attrMap;
3030 66 : remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
3031 : remoteslot_part);
3032 : }
3033 : else
3034 : {
3035 84 : remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
3036 84 : slot_getallattrs(remoteslot_part);
3037 : }
3038 150 : MemoryContextSwitchTo(oldctx);
3039 :
3040 : /* Check if we can do the update or delete on the leaf partition. */
3041 150 : if (operation == CMD_UPDATE || operation == CMD_DELETE)
3042 : {
3043 60 : part_entry = logicalrep_partition_open(relmapentry, partrel,
3044 : attrmap);
3045 60 : check_relation_updatable(part_entry);
3046 : }
3047 :
3048 150 : switch (operation)
3049 : {
3050 90 : case CMD_INSERT:
3051 90 : apply_handle_insert_internal(edata, partrelinfo,
3052 : remoteslot_part);
3053 88 : break;
3054 :
3055 34 : case CMD_DELETE:
3056 34 : apply_handle_delete_internal(edata, partrelinfo,
3057 : remoteslot_part,
3058 : part_entry->localindexoid);
3059 34 : break;
3060 :
3061 26 : case CMD_UPDATE:
3062 :
3063 : /*
3064 : * For UPDATE, depending on whether or not the updated tuple
3065 : * satisfies the partition's constraint, perform a simple UPDATE
3066 : * of the partition or move the updated tuple into a different
3067 : * suitable partition.
3068 : */
3069 : {
3070 : TupleTableSlot *localslot;
3071 : ResultRelInfo *partrelinfo_new;
3072 : Relation partrel_new;
3073 : bool found;
3074 : EPQState epqstate;
3075 26 : ConflictTupleInfo conflicttuple = {0};
3076 :
3077 : /* Get the matching local tuple from the partition. */
3078 26 : found = FindReplTupleInLocalRel(edata, partrel,
3079 : &part_entry->remoterel,
3080 : part_entry->localindexoid,
3081 : remoteslot_part, &localslot);
3082 26 : if (!found)
3083 : {
3084 4 : TupleTableSlot *newslot = localslot;
3085 :
3086 : /* Store the new tuple for conflict reporting */
3087 4 : slot_store_data(newslot, part_entry, newtup);
3088 :
3089 : /*
3090 : * The tuple to be updated could not be found. Do nothing
3091 : * except for emitting a log message.
3092 : */
3093 4 : ReportApplyConflict(estate, partrelinfo, LOG,
3094 : CT_UPDATE_MISSING, remoteslot_part,
3095 4 : newslot, list_make1(&conflicttuple));
3096 :
3097 4 : return;
3098 : }
3099 :
3100 : /*
3101 : * Report the conflict if the tuple was modified by a
3102 : * different origin.
3103 : */
3104 22 : if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
3105 : &conflicttuple.origin,
3106 2 : &conflicttuple.ts) &&
3107 2 : conflicttuple.origin != replorigin_session_origin)
3108 : {
3109 : TupleTableSlot *newslot;
3110 :
3111 : /* Store the new tuple for conflict reporting */
3112 2 : newslot = table_slot_create(partrel, &estate->es_tupleTable);
3113 2 : slot_store_data(newslot, part_entry, newtup);
3114 :
3115 2 : conflicttuple.slot = localslot;
3116 :
3117 2 : ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
3118 : remoteslot_part, newslot,
3119 2 : list_make1(&conflicttuple));
3120 : }
3121 :
3122 : /*
3123 : * Apply the update to the local tuple, putting the result in
3124 : * remoteslot_part.
3125 : */
3126 22 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3127 22 : slot_modify_data(remoteslot_part, localslot, part_entry,
3128 : newtup);
3129 22 : MemoryContextSwitchTo(oldctx);
3130 :
3131 22 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3132 :
3133 : /*
3134 : * Does the updated tuple still satisfy the current
3135 : * partition's constraint?
3136 : */
3137 44 : if (!partrel->rd_rel->relispartition ||
3138 22 : ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
3139 : false))
3140 : {
3141 : /*
3142 : * Yes, so simply UPDATE the partition. We don't call
3143 : * apply_handle_update_internal() here, which would
3144 : * normally do the following work, to avoid repeating some
3145 : * work already done above to find the local tuple in the
3146 : * partition.
3147 : */
3148 20 : InitConflictIndexes(partrelinfo);
3149 :
3150 20 : EvalPlanQualSetSlot(&epqstate, remoteslot_part);
3151 20 : TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
3152 : ACL_UPDATE);
3153 20 : ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3154 : localslot, remoteslot_part);
3155 : }
3156 : else
3157 : {
3158 : /* Move the tuple into the new partition. */
3159 :
3160 : /*
3161 : * New partition will be found using tuple routing, which
3162 : * can only occur via the parent table. We might need to
3163 : * convert the tuple to the parent's rowtype. Note that
3164 : * this is the tuple found in the partition, not the
3165 : * original search tuple received by this function.
3166 : */
3167 2 : if (map)
3168 : {
3169 : TupleConversionMap *PartitionToRootMap =
3170 2 : convert_tuples_by_name(RelationGetDescr(partrel),
3171 : RelationGetDescr(parentrel));
3172 :
3173 : remoteslot =
3174 2 : execute_attr_map_slot(PartitionToRootMap->attrMap,
3175 : remoteslot_part, remoteslot);
3176 : }
3177 : else
3178 : {
3179 0 : remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
3180 0 : slot_getallattrs(remoteslot);
3181 : }
3182 :
3183 : /* Find the new partition. */
3184 2 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3185 2 : partrelinfo_new = ExecFindPartition(mtstate, relinfo,
3186 : proute, remoteslot,
3187 : estate);
3188 2 : MemoryContextSwitchTo(oldctx);
3189 : Assert(partrelinfo_new != partrelinfo);
3190 2 : partrel_new = partrelinfo_new->ri_RelationDesc;
3191 :
3192 : /* Check that new partition also has supported relkind. */
3193 2 : CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3194 2 : get_namespace_name(RelationGetNamespace(partrel_new)),
3195 2 : RelationGetRelationName(partrel_new));
3196 :
3197 : /* DELETE old tuple found in the old partition. */
3198 2 : EvalPlanQualSetSlot(&epqstate, localslot);
3199 2 : TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, ACL_DELETE);
3200 2 : ExecSimpleRelationDelete(partrelinfo, estate, &epqstate, localslot);
3201 :
3202 : /* INSERT new tuple into the new partition. */
3203 :
3204 : /*
3205 : * Convert the replacement tuple to match the destination
3206 : * partition rowtype.
3207 : */
3208 2 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3209 2 : remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3210 2 : if (remoteslot_part == NULL)
3211 2 : remoteslot_part = table_slot_create(partrel_new,
3212 : &estate->es_tupleTable);
3213 2 : map = ExecGetRootToChildMap(partrelinfo_new, estate);
3214 2 : if (map != NULL)
3215 : {
3216 0 : remoteslot_part = execute_attr_map_slot(map->attrMap,
3217 : remoteslot,
3218 : remoteslot_part);
3219 : }
3220 : else
3221 : {
3222 2 : remoteslot_part = ExecCopySlot(remoteslot_part,
3223 : remoteslot);
3224 2 : slot_getallattrs(remoteslot);
3225 : }
3226 2 : MemoryContextSwitchTo(oldctx);
3227 2 : apply_handle_insert_internal(edata, partrelinfo_new,
3228 : remoteslot_part);
3229 : }
3230 :
3231 22 : EvalPlanQualEnd(&epqstate);
3232 : }
3233 22 : break;
3234 :
3235 0 : default:
3236 0 : elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3237 : break;
3238 : }
3239 : }
3240 :
3241 : /*
3242 : * Handle TRUNCATE message.
3243 : *
3244 : * TODO: FDW support
3245 : */
3246 : static void
3247 38 : apply_handle_truncate(StringInfo s)
3248 : {
3249 38 : bool cascade = false;
3250 38 : bool restart_seqs = false;
3251 38 : List *remote_relids = NIL;
3252 38 : List *remote_rels = NIL;
3253 38 : List *rels = NIL;
3254 38 : List *part_rels = NIL;
3255 38 : List *relids = NIL;
3256 38 : List *relids_logged = NIL;
3257 : ListCell *lc;
3258 38 : LOCKMODE lockmode = AccessExclusiveLock;
3259 :
3260 : /*
3261 : * Quick return if we are skipping data modification changes or handling
3262 : * streamed transactions.
3263 : */
3264 76 : if (is_skipping_changes() ||
3265 38 : handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
3266 0 : return;
3267 :
3268 38 : begin_replication_step();
3269 :
3270 38 : remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3271 :
3272 94 : foreach(lc, remote_relids)
3273 : {
3274 56 : LogicalRepRelId relid = lfirst_oid(lc);
3275 : LogicalRepRelMapEntry *rel;
3276 :
3277 56 : rel = logicalrep_rel_open(relid, lockmode);
3278 56 : if (!should_apply_changes_for_rel(rel))
3279 : {
3280 : /*
3281 : * The relation can't become interesting in the middle of the
3282 : * transaction so it's safe to unlock it.
3283 : */
3284 0 : logicalrep_rel_close(rel, lockmode);
3285 0 : continue;
3286 : }
3287 :
3288 56 : remote_rels = lappend(remote_rels, rel);
3289 56 : TargetPrivilegesCheck(rel->localrel, ACL_TRUNCATE);
3290 56 : rels = lappend(rels, rel->localrel);
3291 56 : relids = lappend_oid(relids, rel->localreloid);
3292 56 : if (RelationIsLogicallyLogged(rel->localrel))
3293 0 : relids_logged = lappend_oid(relids_logged, rel->localreloid);
3294 :
3295 : /*
3296 : * Truncate partitions if we got a message to truncate a partitioned
3297 : * table.
3298 : */
3299 56 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3300 : {
3301 : ListCell *child;
3302 8 : List *children = find_all_inheritors(rel->localreloid,
3303 : lockmode,
3304 : NULL);
3305 :
3306 30 : foreach(child, children)
3307 : {
3308 22 : Oid childrelid = lfirst_oid(child);
3309 : Relation childrel;
3310 :
3311 22 : if (list_member_oid(relids, childrelid))
3312 8 : continue;
3313 :
3314 : /* find_all_inheritors already got lock */
3315 14 : childrel = table_open(childrelid, NoLock);
3316 :
3317 : /*
3318 : * Ignore temp tables of other backends. See similar code in
3319 : * ExecuteTruncate().
3320 : */
3321 14 : if (RELATION_IS_OTHER_TEMP(childrel))
3322 : {
3323 0 : table_close(childrel, lockmode);
3324 0 : continue;
3325 : }
3326 :
3327 14 : TargetPrivilegesCheck(childrel, ACL_TRUNCATE);
3328 14 : rels = lappend(rels, childrel);
3329 14 : part_rels = lappend(part_rels, childrel);
3330 14 : relids = lappend_oid(relids, childrelid);
3331 : /* Log this relation only if needed for logical decoding */
3332 14 : if (RelationIsLogicallyLogged(childrel))
3333 0 : relids_logged = lappend_oid(relids_logged, childrelid);
3334 : }
3335 : }
3336 : }
3337 :
3338 : /*
3339 : * Even if we used CASCADE on the upstream primary we explicitly default
3340 : * to replaying changes without further cascading. This might be later
3341 : * changeable with a user specified option.
3342 : *
3343 : * MySubscription->runasowner tells us whether we want to execute
3344 : * replication actions as the subscription owner; the last argument to
3345 : * TruncateGuts tells it whether we want to switch to the table owner.
3346 : * Those are exactly opposite conditions.
3347 : */
3348 38 : ExecuteTruncateGuts(rels,
3349 : relids,
3350 : relids_logged,
3351 : DROP_RESTRICT,
3352 : restart_seqs,
3353 38 : !MySubscription->runasowner);
3354 94 : foreach(lc, remote_rels)
3355 : {
3356 56 : LogicalRepRelMapEntry *rel = lfirst(lc);
3357 :
3358 56 : logicalrep_rel_close(rel, NoLock);
3359 : }
3360 52 : foreach(lc, part_rels)
3361 : {
3362 14 : Relation rel = lfirst(lc);
3363 :
3364 14 : table_close(rel, NoLock);
3365 : }
3366 :
3367 38 : end_replication_step();
3368 : }
3369 :
3370 :
3371 : /*
3372 : * Logical replication protocol message dispatcher.
3373 : */
3374 : void
3375 675230 : apply_dispatch(StringInfo s)
3376 : {
3377 675230 : LogicalRepMsgType action = pq_getmsgbyte(s);
3378 : LogicalRepMsgType saved_command;
3379 :
3380 : /*
3381 : * Set the current command being applied. Since this function can be
3382 : * called recursively when applying spooled changes, save the current
3383 : * command.
3384 : */
3385 675230 : saved_command = apply_error_callback_arg.command;
3386 675230 : apply_error_callback_arg.command = action;
3387 :
3388 675230 : switch (action)
3389 : {
3390 906 : case LOGICAL_REP_MSG_BEGIN:
3391 906 : apply_handle_begin(s);
3392 906 : break;
3393 :
3394 848 : case LOGICAL_REP_MSG_COMMIT:
3395 848 : apply_handle_commit(s);
3396 848 : break;
3397 :
3398 372590 : case LOGICAL_REP_MSG_INSERT:
3399 372590 : apply_handle_insert(s);
3400 372544 : break;
3401 :
3402 132318 : case LOGICAL_REP_MSG_UPDATE:
3403 132318 : apply_handle_update(s);
3404 132302 : break;
3405 :
3406 163866 : case LOGICAL_REP_MSG_DELETE:
3407 163866 : apply_handle_delete(s);
3408 163866 : break;
3409 :
3410 38 : case LOGICAL_REP_MSG_TRUNCATE:
3411 38 : apply_handle_truncate(s);
3412 38 : break;
3413 :
3414 864 : case LOGICAL_REP_MSG_RELATION:
3415 864 : apply_handle_relation(s);
3416 864 : break;
3417 :
3418 36 : case LOGICAL_REP_MSG_TYPE:
3419 36 : apply_handle_type(s);
3420 36 : break;
3421 :
3422 14 : case LOGICAL_REP_MSG_ORIGIN:
3423 14 : apply_handle_origin(s);
3424 14 : break;
3425 :
3426 0 : case LOGICAL_REP_MSG_MESSAGE:
3427 :
3428 : /*
3429 : * Logical replication does not use generic logical messages yet.
3430 : * Although, it could be used by other applications that use this
3431 : * output plugin.
3432 : */
3433 0 : break;
3434 :
3435 1710 : case LOGICAL_REP_MSG_STREAM_START:
3436 1710 : apply_handle_stream_start(s);
3437 1710 : break;
3438 :
3439 1708 : case LOGICAL_REP_MSG_STREAM_STOP:
3440 1708 : apply_handle_stream_stop(s);
3441 1704 : break;
3442 :
3443 76 : case LOGICAL_REP_MSG_STREAM_ABORT:
3444 76 : apply_handle_stream_abort(s);
3445 76 : break;
3446 :
3447 122 : case LOGICAL_REP_MSG_STREAM_COMMIT:
3448 122 : apply_handle_stream_commit(s);
3449 118 : break;
3450 :
3451 32 : case LOGICAL_REP_MSG_BEGIN_PREPARE:
3452 32 : apply_handle_begin_prepare(s);
3453 32 : break;
3454 :
3455 30 : case LOGICAL_REP_MSG_PREPARE:
3456 30 : apply_handle_prepare(s);
3457 28 : break;
3458 :
3459 40 : case LOGICAL_REP_MSG_COMMIT_PREPARED:
3460 40 : apply_handle_commit_prepared(s);
3461 40 : break;
3462 :
3463 10 : case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
3464 10 : apply_handle_rollback_prepared(s);
3465 10 : break;
3466 :
3467 22 : case LOGICAL_REP_MSG_STREAM_PREPARE:
3468 22 : apply_handle_stream_prepare(s);
3469 22 : break;
3470 :
3471 0 : default:
3472 0 : ereport(ERROR,
3473 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
3474 : errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3475 : }
3476 :
3477 : /* Reset the current command */
3478 675158 : apply_error_callback_arg.command = saved_command;
3479 675158 : }
3480 :
3481 : /*
3482 : * Figure out which write/flush positions to report to the walsender process.
3483 : *
3484 : * We can't simply report back the last LSN the walsender sent us because the
3485 : * local transaction might not yet be flushed to disk locally. Instead we
3486 : * build a list that associates local with remote LSNs for every commit. When
3487 : * reporting back the flush position to the sender we iterate that list and
3488 : * check which entries on it are already locally flushed. Those we can report
3489 : * as having been flushed.
3490 : *
3491 : * The have_pending_txes is true if there are outstanding transactions that
3492 : * need to be flushed.
3493 : */
3494 : static void
3495 96504 : get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
3496 : bool *have_pending_txes)
3497 : {
3498 : dlist_mutable_iter iter;
3499 96504 : XLogRecPtr local_flush = GetFlushRecPtr(NULL);
3500 :
3501 96504 : *write = InvalidXLogRecPtr;
3502 96504 : *flush = InvalidXLogRecPtr;
3503 :
3504 97426 : dlist_foreach_modify(iter, &lsn_mapping)
3505 : {
3506 29566 : FlushPosition *pos =
3507 29566 : dlist_container(FlushPosition, node, iter.cur);
3508 :
3509 29566 : *write = pos->remote_end;
3510 :
3511 29566 : if (pos->local_end <= local_flush)
3512 : {
3513 922 : *flush = pos->remote_end;
3514 922 : dlist_delete(iter.cur);
3515 922 : pfree(pos);
3516 : }
3517 : else
3518 : {
3519 : /*
3520 : * Don't want to uselessly iterate over the rest of the list which
3521 : * could potentially be long. Instead get the last element and
3522 : * grab the write position from there.
3523 : */
3524 28644 : pos = dlist_tail_element(FlushPosition, node,
3525 : &lsn_mapping);
3526 28644 : *write = pos->remote_end;
3527 28644 : *have_pending_txes = true;
3528 28644 : return;
3529 : }
3530 : }
3531 :
3532 67860 : *have_pending_txes = !dlist_is_empty(&lsn_mapping);
3533 : }
3534 :
3535 : /*
3536 : * Store current remote/local lsn pair in the tracking list.
3537 : */
3538 : void
3539 1060 : store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
3540 : {
3541 : FlushPosition *flushpos;
3542 :
3543 : /*
3544 : * Skip for parallel apply workers, because the lsn_mapping is maintained
3545 : * by the leader apply worker.
3546 : */
3547 1060 : if (am_parallel_apply_worker())
3548 38 : return;
3549 :
3550 : /* Need to do this in permanent context */
3551 1022 : MemoryContextSwitchTo(ApplyContext);
3552 :
3553 : /* Track commit lsn */
3554 1022 : flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
3555 1022 : flushpos->local_end = local_lsn;
3556 1022 : flushpos->remote_end = remote_lsn;
3557 :
3558 1022 : dlist_push_tail(&lsn_mapping, &flushpos->node);
3559 1022 : MemoryContextSwitchTo(ApplyMessageContext);
3560 : }
3561 :
3562 :
3563 : /* Update statistics of the worker. */
3564 : static void
3565 376888 : UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
3566 : {
3567 376888 : MyLogicalRepWorker->last_lsn = last_lsn;
3568 376888 : MyLogicalRepWorker->last_send_time = send_time;
3569 376888 : MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
3570 376888 : if (reply)
3571 : {
3572 7310 : MyLogicalRepWorker->reply_lsn = last_lsn;
3573 7310 : MyLogicalRepWorker->reply_time = send_time;
3574 : }
3575 376888 : }
3576 :
3577 : /*
3578 : * Apply main loop.
3579 : */
3580 : static void
3581 740 : LogicalRepApplyLoop(XLogRecPtr last_received)
3582 : {
3583 740 : TimestampTz last_recv_timestamp = GetCurrentTimestamp();
3584 740 : bool ping_sent = false;
3585 : TimeLineID tli;
3586 : ErrorContextCallback errcallback;
3587 :
3588 : /*
3589 : * Init the ApplyMessageContext which we clean up after each replication
3590 : * protocol message.
3591 : */
3592 740 : ApplyMessageContext = AllocSetContextCreate(ApplyContext,
3593 : "ApplyMessageContext",
3594 : ALLOCSET_DEFAULT_SIZES);
3595 :
3596 : /*
3597 : * This memory context is used for per-stream data when the streaming mode
3598 : * is enabled. This context is reset on each stream stop.
3599 : */
3600 740 : LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
3601 : "LogicalStreamingContext",
3602 : ALLOCSET_DEFAULT_SIZES);
3603 :
3604 : /* mark as idle, before starting to loop */
3605 740 : pgstat_report_activity(STATE_IDLE, NULL);
3606 :
3607 : /*
3608 : * Push apply error context callback. Fields will be filled while applying
3609 : * a change.
3610 : */
3611 740 : errcallback.callback = apply_error_callback;
3612 740 : errcallback.previous = error_context_stack;
3613 740 : error_context_stack = &errcallback;
3614 740 : apply_error_context_stack = error_context_stack;
3615 :
3616 : /* This outer loop iterates once per wait. */
3617 : for (;;)
3618 87928 : {
3619 88668 : pgsocket fd = PGINVALID_SOCKET;
3620 : int rc;
3621 : int len;
3622 88668 : char *buf = NULL;
3623 88668 : bool endofstream = false;
3624 : long wait_time;
3625 :
3626 88668 : CHECK_FOR_INTERRUPTS();
3627 :
3628 88668 : MemoryContextSwitchTo(ApplyMessageContext);
3629 :
3630 88668 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
3631 :
3632 88634 : if (len != 0)
3633 : {
3634 : /* Loop to process all available data (without blocking). */
3635 : for (;;)
3636 : {
3637 463314 : CHECK_FOR_INTERRUPTS();
3638 :
3639 463314 : if (len == 0)
3640 : {
3641 86408 : break;
3642 : }
3643 376906 : else if (len < 0)
3644 : {
3645 18 : ereport(LOG,
3646 : (errmsg("data stream from publisher has ended")));
3647 18 : endofstream = true;
3648 18 : break;
3649 : }
3650 : else
3651 : {
3652 : int c;
3653 : StringInfoData s;
3654 :
3655 376888 : if (ConfigReloadPending)
3656 : {
3657 0 : ConfigReloadPending = false;
3658 0 : ProcessConfigFile(PGC_SIGHUP);
3659 : }
3660 :
3661 : /* Reset timeout. */
3662 376888 : last_recv_timestamp = GetCurrentTimestamp();
3663 376888 : ping_sent = false;
3664 :
3665 : /* Ensure we are reading the data into our memory context. */
3666 376888 : MemoryContextSwitchTo(ApplyMessageContext);
3667 :
3668 376888 : initReadOnlyStringInfo(&s, buf, len);
3669 :
3670 376888 : c = pq_getmsgbyte(&s);
3671 :
3672 376888 : if (c == 'w')
3673 : {
3674 : XLogRecPtr start_lsn;
3675 : XLogRecPtr end_lsn;
3676 : TimestampTz send_time;
3677 :
3678 369578 : start_lsn = pq_getmsgint64(&s);
3679 369578 : end_lsn = pq_getmsgint64(&s);
3680 369578 : send_time = pq_getmsgint64(&s);
3681 :
3682 369578 : if (last_received < start_lsn)
3683 309520 : last_received = start_lsn;
3684 :
3685 369578 : if (last_received < end_lsn)
3686 0 : last_received = end_lsn;
3687 :
3688 369578 : UpdateWorkerStats(last_received, send_time, false);
3689 :
3690 369578 : apply_dispatch(&s);
3691 : }
3692 7310 : else if (c == 'k')
3693 : {
3694 : XLogRecPtr end_lsn;
3695 : TimestampTz timestamp;
3696 : bool reply_requested;
3697 :
3698 7310 : end_lsn = pq_getmsgint64(&s);
3699 7310 : timestamp = pq_getmsgint64(&s);
3700 7310 : reply_requested = pq_getmsgbyte(&s);
3701 :
3702 7310 : if (last_received < end_lsn)
3703 1680 : last_received = end_lsn;
3704 :
3705 7310 : send_feedback(last_received, reply_requested, false);
3706 7310 : UpdateWorkerStats(last_received, timestamp, true);
3707 : }
3708 : /* other message types are purposefully ignored */
3709 :
3710 376822 : MemoryContextReset(ApplyMessageContext);
3711 : }
3712 :
3713 376822 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
3714 : }
3715 : }
3716 :
3717 : /* confirm all writes so far */
3718 88568 : send_feedback(last_received, false, false);
3719 :
3720 88568 : if (!in_remote_transaction && !in_streamed_transaction)
3721 : {
3722 : /*
3723 : * If we didn't get any transactions for a while there might be
3724 : * unconsumed invalidation messages in the queue, consume them
3725 : * now.
3726 : */
3727 9776 : AcceptInvalidationMessages();
3728 9776 : maybe_reread_subscription();
3729 :
3730 : /* Process any table synchronization changes. */
3731 9708 : process_syncing_tables(last_received);
3732 : }
3733 :
3734 : /* Cleanup the memory. */
3735 88124 : MemoryContextReset(ApplyMessageContext);
3736 88124 : MemoryContextSwitchTo(TopMemoryContext);
3737 :
3738 : /* Check if we need to exit the streaming loop. */
3739 88124 : if (endofstream)
3740 18 : break;
3741 :
3742 : /*
3743 : * Wait for more data or latch. If we have unflushed transactions,
3744 : * wake up after WalWriterDelay to see if they've been flushed yet (in
3745 : * which case we should send a feedback message). Otherwise, there's
3746 : * no particular urgency about waking up unless we get data or a
3747 : * signal.
3748 : */
3749 88106 : if (!dlist_is_empty(&lsn_mapping))
3750 23468 : wait_time = WalWriterDelay;
3751 : else
3752 64638 : wait_time = NAPTIME_PER_CYCLE;
3753 :
3754 88106 : rc = WaitLatchOrSocket(MyLatch,
3755 : WL_SOCKET_READABLE | WL_LATCH_SET |
3756 : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
3757 : fd, wait_time,
3758 : WAIT_EVENT_LOGICAL_APPLY_MAIN);
3759 :
3760 88106 : if (rc & WL_LATCH_SET)
3761 : {
3762 1036 : ResetLatch(MyLatch);
3763 1036 : CHECK_FOR_INTERRUPTS();
3764 : }
3765 :
3766 87928 : if (ConfigReloadPending)
3767 : {
3768 16 : ConfigReloadPending = false;
3769 16 : ProcessConfigFile(PGC_SIGHUP);
3770 : }
3771 :
3772 87928 : if (rc & WL_TIMEOUT)
3773 : {
3774 : /*
3775 : * We didn't receive anything new. If we haven't heard anything
3776 : * from the server for more than wal_receiver_timeout / 2, ping
3777 : * the server. Also, if it's been longer than
3778 : * wal_receiver_status_interval since the last update we sent,
3779 : * send a status update to the primary anyway, to report any
3780 : * progress in applying WAL.
3781 : */
3782 626 : bool requestReply = false;
3783 :
3784 : /*
3785 : * Check if time since last receive from primary has reached the
3786 : * configured limit.
3787 : */
3788 626 : if (wal_receiver_timeout > 0)
3789 : {
3790 626 : TimestampTz now = GetCurrentTimestamp();
3791 : TimestampTz timeout;
3792 :
3793 626 : timeout =
3794 626 : TimestampTzPlusMilliseconds(last_recv_timestamp,
3795 : wal_receiver_timeout);
3796 :
3797 626 : if (now >= timeout)
3798 0 : ereport(ERROR,
3799 : (errcode(ERRCODE_CONNECTION_FAILURE),
3800 : errmsg("terminating logical replication worker due to timeout")));
3801 :
3802 : /* Check to see if it's time for a ping. */
3803 626 : if (!ping_sent)
3804 : {
3805 626 : timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
3806 : (wal_receiver_timeout / 2));
3807 626 : if (now >= timeout)
3808 : {
3809 8 : requestReply = true;
3810 8 : ping_sent = true;
3811 : }
3812 : }
3813 : }
3814 :
3815 626 : send_feedback(last_received, requestReply, requestReply);
3816 :
3817 : /*
3818 : * Force reporting to ensure long idle periods don't lead to
3819 : * arbitrarily delayed stats. Stats can only be reported outside
3820 : * of (implicit or explicit) transactions. That shouldn't lead to
3821 : * stats being delayed for long, because transactions are either
3822 : * sent as a whole on commit or streamed. Streamed transactions
3823 : * are spilled to disk and applied on commit.
3824 : */
3825 626 : if (!IsTransactionState())
3826 626 : pgstat_report_stat(true);
3827 : }
3828 : }
3829 :
3830 : /* Pop the error context stack */
3831 18 : error_context_stack = errcallback.previous;
3832 18 : apply_error_context_stack = error_context_stack;
3833 :
3834 : /* All done */
3835 18 : walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
3836 0 : }
3837 :
3838 : /*
3839 : * Send a Standby Status Update message to server.
3840 : *
3841 : * 'recvpos' is the latest LSN we've received data to, force is set if we need
3842 : * to send a response to avoid timeouts.
3843 : */
3844 : static void
3845 96504 : send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
3846 : {
3847 : static StringInfo reply_message = NULL;
3848 : static TimestampTz send_time = 0;
3849 :
3850 : static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
3851 : static XLogRecPtr last_writepos = InvalidXLogRecPtr;
3852 : static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
3853 :
3854 : XLogRecPtr writepos;
3855 : XLogRecPtr flushpos;
3856 : TimestampTz now;
3857 : bool have_pending_txes;
3858 :
3859 : /*
3860 : * If the user doesn't want status to be reported to the publisher, be
3861 : * sure to exit before doing anything at all.
3862 : */
3863 96504 : if (!force && wal_receiver_status_interval <= 0)
3864 40086 : return;
3865 :
3866 : /* It's legal to not pass a recvpos */
3867 96504 : if (recvpos < last_recvpos)
3868 0 : recvpos = last_recvpos;
3869 :
3870 96504 : get_flush_position(&writepos, &flushpos, &have_pending_txes);
3871 :
3872 : /*
3873 : * No outstanding transactions to flush, we can report the latest received
3874 : * position. This is important for synchronous replication.
3875 : */
3876 96504 : if (!have_pending_txes)
3877 67860 : flushpos = writepos = recvpos;
3878 :
3879 96504 : if (writepos < last_writepos)
3880 0 : writepos = last_writepos;
3881 :
3882 96504 : if (flushpos < last_flushpos)
3883 28566 : flushpos = last_flushpos;
3884 :
3885 96504 : now = GetCurrentTimestamp();
3886 :
3887 : /* if we've already reported everything we're good */
3888 96504 : if (!force &&
3889 92372 : writepos == last_writepos &&
3890 40586 : flushpos == last_flushpos &&
3891 40344 : !TimestampDifferenceExceeds(send_time, now,
3892 : wal_receiver_status_interval * 1000))
3893 40086 : return;
3894 56418 : send_time = now;
3895 :
3896 56418 : if (!reply_message)
3897 : {
3898 740 : MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
3899 :
3900 740 : reply_message = makeStringInfo();
3901 740 : MemoryContextSwitchTo(oldctx);
3902 : }
3903 : else
3904 55678 : resetStringInfo(reply_message);
3905 :
3906 56418 : pq_sendbyte(reply_message, 'r');
3907 56418 : pq_sendint64(reply_message, recvpos); /* write */
3908 56418 : pq_sendint64(reply_message, flushpos); /* flush */
3909 56418 : pq_sendint64(reply_message, writepos); /* apply */
3910 56418 : pq_sendint64(reply_message, now); /* sendTime */
3911 56418 : pq_sendbyte(reply_message, requestReply); /* replyRequested */
3912 :
3913 56418 : elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
3914 : force,
3915 : LSN_FORMAT_ARGS(recvpos),
3916 : LSN_FORMAT_ARGS(writepos),
3917 : LSN_FORMAT_ARGS(flushpos));
3918 :
3919 56418 : walrcv_send(LogRepWorkerWalRcvConn,
3920 : reply_message->data, reply_message->len);
3921 :
3922 56418 : if (recvpos > last_recvpos)
3923 51794 : last_recvpos = recvpos;
3924 56418 : if (writepos > last_writepos)
3925 51792 : last_writepos = writepos;
3926 56418 : if (flushpos > last_flushpos)
3927 51268 : last_flushpos = flushpos;
3928 : }
3929 :
3930 : /*
3931 : * Exit routine for apply workers due to subscription parameter changes.
3932 : */
3933 : static void
3934 74 : apply_worker_exit(void)
3935 : {
3936 74 : if (am_parallel_apply_worker())
3937 : {
3938 : /*
3939 : * Don't stop the parallel apply worker as the leader will detect the
3940 : * subscription parameter change and restart logical replication later
3941 : * anyway. This also prevents the leader from reporting errors when
3942 : * trying to communicate with a stopped parallel apply worker, which
3943 : * would accidentally disable subscriptions if disable_on_error was
3944 : * set.
3945 : */
3946 0 : return;
3947 : }
3948 :
3949 : /*
3950 : * Reset the last-start time for this apply worker so that the launcher
3951 : * will restart it without waiting for wal_retrieve_retry_interval if the
3952 : * subscription is still active, and so that we won't leak that hash table
3953 : * entry if it isn't.
3954 : */
3955 74 : if (am_leader_apply_worker())
3956 74 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
3957 :
3958 74 : proc_exit(0);
3959 : }
3960 :
3961 : /*
3962 : * Reread subscription info if needed.
3963 : *
3964 : * For significant changes, we react by exiting the current process; a new
3965 : * one will be launched afterwards if needed.
3966 : */
3967 : void
3968 11740 : maybe_reread_subscription(void)
3969 : {
3970 : MemoryContext oldctx;
3971 : Subscription *newsub;
3972 11740 : bool started_tx = false;
3973 :
3974 : /* When cache state is valid there is nothing to do here. */
3975 11740 : if (MySubscriptionValid)
3976 11586 : return;
3977 :
3978 : /* This function might be called inside or outside of transaction. */
3979 154 : if (!IsTransactionState())
3980 : {
3981 144 : StartTransactionCommand();
3982 144 : started_tx = true;
3983 : }
3984 :
3985 : /* Ensure allocations in permanent context. */
3986 154 : oldctx = MemoryContextSwitchTo(ApplyContext);
3987 :
3988 154 : newsub = GetSubscription(MyLogicalRepWorker->subid, true);
3989 :
3990 : /*
3991 : * Exit if the subscription was removed. This normally should not happen
3992 : * as the worker gets killed during DROP SUBSCRIPTION.
3993 : */
3994 154 : if (!newsub)
3995 : {
3996 0 : ereport(LOG,
3997 : (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
3998 : MySubscription->name)));
3999 :
4000 : /* Ensure we remove no-longer-useful entry for worker's start time */
4001 0 : if (am_leader_apply_worker())
4002 0 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
4003 :
4004 0 : proc_exit(0);
4005 : }
4006 :
4007 : /* Exit if the subscription was disabled. */
4008 154 : if (!newsub->enabled)
4009 : {
4010 18 : ereport(LOG,
4011 : (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
4012 : MySubscription->name)));
4013 :
4014 18 : apply_worker_exit();
4015 : }
4016 :
4017 : /* !slotname should never happen when enabled is true. */
4018 : Assert(newsub->slotname);
4019 :
4020 : /* two-phase cannot be altered while the worker is running */
4021 : Assert(newsub->twophasestate == MySubscription->twophasestate);
4022 :
4023 : /*
4024 : * Exit if any parameter that affects the remote connection was changed.
4025 : * The launcher will start a new worker but note that the parallel apply
4026 : * worker won't restart if the streaming option's value is changed from
4027 : * 'parallel' to any other value or the server decides not to stream the
4028 : * in-progress transaction.
4029 : */
4030 136 : if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
4031 132 : strcmp(newsub->name, MySubscription->name) != 0 ||
4032 130 : strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
4033 130 : newsub->binary != MySubscription->binary ||
4034 118 : newsub->stream != MySubscription->stream ||
4035 108 : newsub->passwordrequired != MySubscription->passwordrequired ||
4036 108 : strcmp(newsub->origin, MySubscription->origin) != 0 ||
4037 108 : newsub->owner != MySubscription->owner ||
4038 106 : !equal(newsub->publications, MySubscription->publications))
4039 : {
4040 48 : if (am_parallel_apply_worker())
4041 0 : ereport(LOG,
4042 : (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
4043 : MySubscription->name)));
4044 : else
4045 48 : ereport(LOG,
4046 : (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
4047 : MySubscription->name)));
4048 :
4049 48 : apply_worker_exit();
4050 : }
4051 :
4052 : /*
4053 : * Exit if the subscription owner's superuser privileges have been
4054 : * revoked.
4055 : */
4056 88 : if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
4057 : {
4058 8 : if (am_parallel_apply_worker())
4059 0 : ereport(LOG,
4060 : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
4061 : MySubscription->name));
4062 : else
4063 8 : ereport(LOG,
4064 : errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
4065 : MySubscription->name));
4066 :
4067 8 : apply_worker_exit();
4068 : }
4069 :
4070 : /* Check for other changes that should never happen too. */
4071 80 : if (newsub->dbid != MySubscription->dbid)
4072 : {
4073 0 : elog(ERROR, "subscription %u changed unexpectedly",
4074 : MyLogicalRepWorker->subid);
4075 : }
4076 :
4077 : /* Clean old subscription info and switch to new one. */
4078 80 : FreeSubscription(MySubscription);
4079 80 : MySubscription = newsub;
4080 :
4081 80 : MemoryContextSwitchTo(oldctx);
4082 :
4083 : /* Change synchronous commit according to the user's wishes */
4084 80 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
4085 : PGC_BACKEND, PGC_S_OVERRIDE);
4086 :
4087 80 : if (started_tx)
4088 76 : CommitTransactionCommand();
4089 :
4090 80 : MySubscriptionValid = true;
4091 : }
4092 :
4093 : /*
4094 : * Callback from subscription syscache invalidation.
4095 : */
4096 : static void
4097 162 : subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
4098 : {
4099 162 : MySubscriptionValid = false;
4100 162 : }
4101 :
4102 : /*
4103 : * subxact_info_write
4104 : * Store information about subxacts for a toplevel transaction.
4105 : *
4106 : * For each subxact we store offset of its first change in the main file.
4107 : * The file is always over-written as a whole.
4108 : *
4109 : * XXX We should only store subxacts that were not aborted yet.
4110 : */
4111 : static void
4112 744 : subxact_info_write(Oid subid, TransactionId xid)
4113 : {
4114 : char path[MAXPGPATH];
4115 : Size len;
4116 : BufFile *fd;
4117 :
4118 : Assert(TransactionIdIsValid(xid));
4119 :
4120 : /* construct the subxact filename */
4121 744 : subxact_filename(path, subid, xid);
4122 :
4123 : /* Delete the subxacts file, if exists. */
4124 744 : if (subxact_data.nsubxacts == 0)
4125 : {
4126 580 : cleanup_subxact_info();
4127 580 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
4128 :
4129 580 : return;
4130 : }
4131 :
4132 : /*
4133 : * Create the subxact file if it not already created, otherwise open the
4134 : * existing file.
4135 : */
4136 164 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR,
4137 : true);
4138 164 : if (fd == NULL)
4139 16 : fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path);
4140 :
4141 164 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
4142 :
4143 : /* Write the subxact count and subxact info */
4144 164 : BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
4145 164 : BufFileWrite(fd, subxact_data.subxacts, len);
4146 :
4147 164 : BufFileClose(fd);
4148 :
4149 : /* free the memory allocated for subxact info */
4150 164 : cleanup_subxact_info();
4151 : }
4152 :
4153 : /*
4154 : * subxact_info_read
4155 : * Restore information about subxacts of a streamed transaction.
4156 : *
4157 : * Read information about subxacts into the structure subxact_data that can be
4158 : * used later.
4159 : */
4160 : static void
4161 688 : subxact_info_read(Oid subid, TransactionId xid)
4162 : {
4163 : char path[MAXPGPATH];
4164 : Size len;
4165 : BufFile *fd;
4166 : MemoryContext oldctx;
4167 :
4168 : Assert(!subxact_data.subxacts);
4169 : Assert(subxact_data.nsubxacts == 0);
4170 : Assert(subxact_data.nsubxacts_max == 0);
4171 :
4172 : /*
4173 : * If the subxact file doesn't exist that means we don't have any subxact
4174 : * info.
4175 : */
4176 688 : subxact_filename(path, subid, xid);
4177 688 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
4178 : true);
4179 688 : if (fd == NULL)
4180 530 : return;
4181 :
4182 : /* read number of subxact items */
4183 158 : BufFileReadExact(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
4184 :
4185 158 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
4186 :
4187 : /* we keep the maximum as a power of 2 */
4188 158 : subxact_data.nsubxacts_max = 1 << my_log2(subxact_data.nsubxacts);
4189 :
4190 : /*
4191 : * Allocate subxact information in the logical streaming context. We need
4192 : * this information during the complete stream so that we can add the sub
4193 : * transaction info to this. On stream stop we will flush this information
4194 : * to the subxact file and reset the logical streaming context.
4195 : */
4196 158 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
4197 158 : subxact_data.subxacts = palloc(subxact_data.nsubxacts_max *
4198 : sizeof(SubXactInfo));
4199 158 : MemoryContextSwitchTo(oldctx);
4200 :
4201 158 : if (len > 0)
4202 158 : BufFileReadExact(fd, subxact_data.subxacts, len);
4203 :
4204 158 : BufFileClose(fd);
4205 : }
4206 :
4207 : /*
4208 : * subxact_info_add
4209 : * Add information about a subxact (offset in the main file).
4210 : */
4211 : static void
4212 205024 : subxact_info_add(TransactionId xid)
4213 : {
4214 205024 : SubXactInfo *subxacts = subxact_data.subxacts;
4215 : int64 i;
4216 :
4217 : /* We must have a valid top level stream xid and a stream fd. */
4218 : Assert(TransactionIdIsValid(stream_xid));
4219 : Assert(stream_fd != NULL);
4220 :
4221 : /*
4222 : * If the XID matches the toplevel transaction, we don't want to add it.
4223 : */
4224 205024 : if (stream_xid == xid)
4225 184776 : return;
4226 :
4227 : /*
4228 : * In most cases we're checking the same subxact as we've already seen in
4229 : * the last call, so make sure to ignore it (this change comes later).
4230 : */
4231 20248 : if (subxact_data.subxact_last == xid)
4232 20096 : return;
4233 :
4234 : /* OK, remember we're processing this XID. */
4235 152 : subxact_data.subxact_last = xid;
4236 :
4237 : /*
4238 : * Check if the transaction is already present in the array of subxact. We
4239 : * intentionally scan the array from the tail, because we're likely adding
4240 : * a change for the most recent subtransactions.
4241 : *
4242 : * XXX Can we rely on the subxact XIDs arriving in sorted order? That
4243 : * would allow us to use binary search here.
4244 : */
4245 190 : for (i = subxact_data.nsubxacts; i > 0; i--)
4246 : {
4247 : /* found, so we're done */
4248 152 : if (subxacts[i - 1].xid == xid)
4249 114 : return;
4250 : }
4251 :
4252 : /* This is a new subxact, so we need to add it to the array. */
4253 38 : if (subxact_data.nsubxacts == 0)
4254 : {
4255 : MemoryContext oldctx;
4256 :
4257 16 : subxact_data.nsubxacts_max = 128;
4258 :
4259 : /*
4260 : * Allocate this memory for subxacts in per-stream context, see
4261 : * subxact_info_read.
4262 : */
4263 16 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
4264 16 : subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
4265 16 : MemoryContextSwitchTo(oldctx);
4266 : }
4267 22 : else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
4268 : {
4269 20 : subxact_data.nsubxacts_max *= 2;
4270 20 : subxacts = repalloc(subxacts,
4271 20 : subxact_data.nsubxacts_max * sizeof(SubXactInfo));
4272 : }
4273 :
4274 38 : subxacts[subxact_data.nsubxacts].xid = xid;
4275 :
4276 : /*
4277 : * Get the current offset of the stream file and store it as offset of
4278 : * this subxact.
4279 : */
4280 38 : BufFileTell(stream_fd,
4281 38 : &subxacts[subxact_data.nsubxacts].fileno,
4282 38 : &subxacts[subxact_data.nsubxacts].offset);
4283 :
4284 38 : subxact_data.nsubxacts++;
4285 38 : subxact_data.subxacts = subxacts;
4286 : }
4287 :
4288 : /* format filename for file containing the info about subxacts */
4289 : static inline void
4290 1494 : subxact_filename(char *path, Oid subid, TransactionId xid)
4291 : {
4292 1494 : snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
4293 1494 : }
4294 :
4295 : /* format filename for file containing serialized changes */
4296 : static inline void
4297 876 : changes_filename(char *path, Oid subid, TransactionId xid)
4298 : {
4299 876 : snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
4300 876 : }
4301 :
4302 : /*
4303 : * stream_cleanup_files
4304 : * Cleanup files for a subscription / toplevel transaction.
4305 : *
4306 : * Remove files with serialized changes and subxact info for a particular
4307 : * toplevel transaction. Each subscription has a separate set of files
4308 : * for any toplevel transaction.
4309 : */
4310 : void
4311 62 : stream_cleanup_files(Oid subid, TransactionId xid)
4312 : {
4313 : char path[MAXPGPATH];
4314 :
4315 : /* Delete the changes file. */
4316 62 : changes_filename(path, subid, xid);
4317 62 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false);
4318 :
4319 : /* Delete the subxact file, if it exists. */
4320 62 : subxact_filename(path, subid, xid);
4321 62 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
4322 62 : }
4323 :
4324 : /*
4325 : * stream_open_file
4326 : * Open a file that we'll use to serialize changes for a toplevel
4327 : * transaction.
4328 : *
4329 : * Open a file for streamed changes from a toplevel transaction identified
4330 : * by stream_xid (global variable). If it's the first chunk of streamed
4331 : * changes for this transaction, create the buffile, otherwise open the
4332 : * previously created file.
4333 : */
4334 : static void
4335 726 : stream_open_file(Oid subid, TransactionId xid, bool first_segment)
4336 : {
4337 : char path[MAXPGPATH];
4338 : MemoryContext oldcxt;
4339 :
4340 : Assert(OidIsValid(subid));
4341 : Assert(TransactionIdIsValid(xid));
4342 : Assert(stream_fd == NULL);
4343 :
4344 :
4345 726 : changes_filename(path, subid, xid);
4346 726 : elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
4347 :
4348 : /*
4349 : * Create/open the buffiles under the logical streaming context so that we
4350 : * have those files until stream stop.
4351 : */
4352 726 : oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
4353 :
4354 : /*
4355 : * If this is the first streamed segment, create the changes file.
4356 : * Otherwise, just open the file for writing, in append mode.
4357 : */
4358 726 : if (first_segment)
4359 64 : stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset,
4360 : path);
4361 : else
4362 : {
4363 : /*
4364 : * Open the file and seek to the end of the file because we always
4365 : * append the changes file.
4366 : */
4367 662 : stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset,
4368 : path, O_RDWR, false);
4369 662 : BufFileSeek(stream_fd, 0, 0, SEEK_END);
4370 : }
4371 :
4372 726 : MemoryContextSwitchTo(oldcxt);
4373 726 : }
4374 :
4375 : /*
4376 : * stream_close_file
4377 : * Close the currently open file with streamed changes.
4378 : */
4379 : static void
4380 786 : stream_close_file(void)
4381 : {
4382 : Assert(stream_fd != NULL);
4383 :
4384 786 : BufFileClose(stream_fd);
4385 :
4386 786 : stream_fd = NULL;
4387 786 : }
4388 :
4389 : /*
4390 : * stream_write_change
4391 : * Serialize a change to a file for the current toplevel transaction.
4392 : *
4393 : * The change is serialized in a simple format, with length (not including
4394 : * the length), action code (identifying the message type) and message
4395 : * contents (without the subxact TransactionId value).
4396 : */
4397 : static void
4398 215106 : stream_write_change(char action, StringInfo s)
4399 : {
4400 : int len;
4401 :
4402 : Assert(stream_fd != NULL);
4403 :
4404 : /* total on-disk size, including the action type character */
4405 215106 : len = (s->len - s->cursor) + sizeof(char);
4406 :
4407 : /* first write the size */
4408 215106 : BufFileWrite(stream_fd, &len, sizeof(len));
4409 :
4410 : /* then the action */
4411 215106 : BufFileWrite(stream_fd, &action, sizeof(action));
4412 :
4413 : /* and finally the remaining part of the buffer (after the XID) */
4414 215106 : len = (s->len - s->cursor);
4415 :
4416 215106 : BufFileWrite(stream_fd, &s->data[s->cursor], len);
4417 215106 : }
4418 :
4419 : /*
4420 : * stream_open_and_write_change
4421 : * Serialize a message to a file for the given transaction.
4422 : *
4423 : * This function is similar to stream_write_change except that it will open the
4424 : * target file if not already before writing the message and close the file at
4425 : * the end.
4426 : */
4427 : static void
4428 10 : stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
4429 : {
4430 : Assert(!in_streamed_transaction);
4431 :
4432 10 : if (!stream_fd)
4433 10 : stream_start_internal(xid, false);
4434 :
4435 10 : stream_write_change(action, s);
4436 10 : stream_stop_internal(xid);
4437 10 : }
4438 :
4439 : /*
4440 : * Sets streaming options including replication slot name and origin start
4441 : * position. Workers need these options for logical replication.
4442 : */
4443 : void
4444 740 : set_stream_options(WalRcvStreamOptions *options,
4445 : char *slotname,
4446 : XLogRecPtr *origin_startpos)
4447 : {
4448 : int server_version;
4449 :
4450 740 : options->logical = true;
4451 740 : options->startpoint = *origin_startpos;
4452 740 : options->slotname = slotname;
4453 :
4454 740 : server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
4455 740 : options->proto.logical.proto_version =
4456 740 : server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
4457 : server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
4458 : server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
4459 : LOGICALREP_PROTO_VERSION_NUM;
4460 :
4461 740 : options->proto.logical.publication_names = MySubscription->publications;
4462 740 : options->proto.logical.binary = MySubscription->binary;
4463 :
4464 : /*
4465 : * Assign the appropriate option value for streaming option according to
4466 : * the 'streaming' mode and the publisher's ability to support that mode.
4467 : */
4468 740 : if (server_version >= 160000 &&
4469 740 : MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
4470 : {
4471 672 : options->proto.logical.streaming_str = "parallel";
4472 672 : MyLogicalRepWorker->parallel_apply = true;
4473 : }
4474 68 : else if (server_version >= 140000 &&
4475 68 : MySubscription->stream != LOGICALREP_STREAM_OFF)
4476 : {
4477 52 : options->proto.logical.streaming_str = "on";
4478 52 : MyLogicalRepWorker->parallel_apply = false;
4479 : }
4480 : else
4481 : {
4482 16 : options->proto.logical.streaming_str = NULL;
4483 16 : MyLogicalRepWorker->parallel_apply = false;
4484 : }
4485 :
4486 740 : options->proto.logical.twophase = false;
4487 740 : options->proto.logical.origin = pstrdup(MySubscription->origin);
4488 740 : }
4489 :
4490 : /*
4491 : * Cleanup the memory for subxacts and reset the related variables.
4492 : */
4493 : static inline void
4494 752 : cleanup_subxact_info()
4495 : {
4496 752 : if (subxact_data.subxacts)
4497 174 : pfree(subxact_data.subxacts);
4498 :
4499 752 : subxact_data.subxacts = NULL;
4500 752 : subxact_data.subxact_last = InvalidTransactionId;
4501 752 : subxact_data.nsubxacts = 0;
4502 752 : subxact_data.nsubxacts_max = 0;
4503 752 : }
4504 :
4505 : /*
4506 : * Common function to run the apply loop with error handling. Disable the
4507 : * subscription, if necessary.
4508 : *
4509 : * Note that we don't handle FATAL errors which are probably because
4510 : * of system resource error and are not repeatable.
4511 : */
4512 : void
4513 740 : start_apply(XLogRecPtr origin_startpos)
4514 : {
4515 740 : PG_TRY();
4516 : {
4517 740 : LogicalRepApplyLoop(origin_startpos);
4518 : }
4519 112 : PG_CATCH();
4520 : {
4521 : /*
4522 : * Reset the origin state to prevent the advancement of origin
4523 : * progress if we fail to apply. Otherwise, this will result in
4524 : * transaction loss as that transaction won't be sent again by the
4525 : * server.
4526 : */
4527 112 : replorigin_reset(0, (Datum) 0);
4528 :
4529 112 : if (MySubscription->disableonerr)
4530 6 : DisableSubscriptionAndExit();
4531 : else
4532 : {
4533 : /*
4534 : * Report the worker failed while applying changes. Abort the
4535 : * current transaction so that the stats message is sent in an
4536 : * idle state.
4537 : */
4538 106 : AbortOutOfAnyTransaction();
4539 106 : pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
4540 :
4541 106 : PG_RE_THROW();
4542 : }
4543 : }
4544 0 : PG_END_TRY();
4545 0 : }
4546 :
4547 : /*
4548 : * Runs the leader apply worker.
4549 : *
4550 : * It sets up replication origin, streaming options and then starts streaming.
4551 : */
4552 : static void
4553 436 : run_apply_worker()
4554 : {
4555 : char originname[NAMEDATALEN];
4556 436 : XLogRecPtr origin_startpos = InvalidXLogRecPtr;
4557 436 : char *slotname = NULL;
4558 : WalRcvStreamOptions options;
4559 : RepOriginId originid;
4560 : TimeLineID startpointTLI;
4561 : char *err;
4562 : bool must_use_password;
4563 :
4564 436 : slotname = MySubscription->slotname;
4565 :
4566 : /*
4567 : * This shouldn't happen if the subscription is enabled, but guard against
4568 : * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
4569 : * slot is NULL.)
4570 : */
4571 436 : if (!slotname)
4572 0 : ereport(ERROR,
4573 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
4574 : errmsg("subscription has no replication slot set")));
4575 :
4576 : /* Setup replication origin tracking. */
4577 436 : ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
4578 : originname, sizeof(originname));
4579 436 : StartTransactionCommand();
4580 436 : originid = replorigin_by_name(originname, true);
4581 436 : if (!OidIsValid(originid))
4582 0 : originid = replorigin_create(originname);
4583 436 : replorigin_session_setup(originid, 0);
4584 436 : replorigin_session_origin = originid;
4585 436 : origin_startpos = replorigin_session_get_progress(false);
4586 436 : CommitTransactionCommand();
4587 :
4588 : /* Is the use of a password mandatory? */
4589 836 : must_use_password = MySubscription->passwordrequired &&
4590 400 : !MySubscription->ownersuperuser;
4591 :
4592 436 : LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
4593 : true, must_use_password,
4594 : MySubscription->name, &err);
4595 :
4596 418 : if (LogRepWorkerWalRcvConn == NULL)
4597 42 : ereport(ERROR,
4598 : (errcode(ERRCODE_CONNECTION_FAILURE),
4599 : errmsg("apply worker for subscription \"%s\" could not connect to the publisher: %s",
4600 : MySubscription->name, err)));
4601 :
4602 : /*
4603 : * We don't really use the output identify_system for anything but it does
4604 : * some initializations on the upstream so let's still call it.
4605 : */
4606 376 : (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
4607 :
4608 376 : set_apply_error_context_origin(originname);
4609 :
4610 376 : set_stream_options(&options, slotname, &origin_startpos);
4611 :
4612 : /*
4613 : * Even when the two_phase mode is requested by the user, it remains as
4614 : * the tri-state PENDING until all tablesyncs have reached READY state.
4615 : * Only then, can it become ENABLED.
4616 : *
4617 : * Note: If the subscription has no tables then leave the state as
4618 : * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
4619 : * work.
4620 : */
4621 408 : if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
4622 32 : AllTablesyncsReady())
4623 : {
4624 : /* Start streaming with two_phase enabled */
4625 18 : options.proto.logical.twophase = true;
4626 18 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
4627 :
4628 18 : StartTransactionCommand();
4629 18 : UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
4630 18 : MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
4631 18 : CommitTransactionCommand();
4632 : }
4633 : else
4634 : {
4635 358 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
4636 : }
4637 :
4638 376 : ereport(DEBUG1,
4639 : (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
4640 : MySubscription->name,
4641 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
4642 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
4643 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
4644 : "?")));
4645 :
4646 : /* Run the main loop. */
4647 376 : start_apply(origin_startpos);
4648 0 : }
4649 :
4650 : /*
4651 : * Common initialization for leader apply worker, parallel apply worker and
4652 : * tablesync worker.
4653 : *
4654 : * Initialize the database connection, in-memory subscription and necessary
4655 : * config options.
4656 : */
4657 : void
4658 848 : InitializeLogRepWorker(void)
4659 : {
4660 : MemoryContext oldctx;
4661 :
4662 : /* Run as replica session replication role. */
4663 848 : SetConfigOption("session_replication_role", "replica",
4664 : PGC_SUSET, PGC_S_OVERRIDE);
4665 :
4666 : /* Connect to our database. */
4667 848 : BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
4668 848 : MyLogicalRepWorker->userid,
4669 : 0);
4670 :
4671 : /*
4672 : * Set always-secure search path, so malicious users can't redirect user
4673 : * code (e.g. pg_index.indexprs).
4674 : */
4675 842 : SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
4676 :
4677 : /* Load the subscription into persistent memory context. */
4678 842 : ApplyContext = AllocSetContextCreate(TopMemoryContext,
4679 : "ApplyContext",
4680 : ALLOCSET_DEFAULT_SIZES);
4681 842 : StartTransactionCommand();
4682 842 : oldctx = MemoryContextSwitchTo(ApplyContext);
4683 :
4684 842 : MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
4685 842 : if (!MySubscription)
4686 : {
4687 0 : ereport(LOG,
4688 : (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
4689 : MyLogicalRepWorker->subid)));
4690 :
4691 : /* Ensure we remove no-longer-useful entry for worker's start time */
4692 0 : if (am_leader_apply_worker())
4693 0 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
4694 :
4695 0 : proc_exit(0);
4696 : }
4697 :
4698 842 : MySubscriptionValid = true;
4699 842 : MemoryContextSwitchTo(oldctx);
4700 :
4701 842 : if (!MySubscription->enabled)
4702 : {
4703 0 : ereport(LOG,
4704 : (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
4705 : MySubscription->name)));
4706 :
4707 0 : apply_worker_exit();
4708 : }
4709 :
4710 : /* Setup synchronous commit according to the user's wishes */
4711 842 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
4712 : PGC_BACKEND, PGC_S_OVERRIDE);
4713 :
4714 : /*
4715 : * Keep us informed about subscription or role changes. Note that the
4716 : * role's superuser privilege can be revoked.
4717 : */
4718 842 : CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
4719 : subscription_change_cb,
4720 : (Datum) 0);
4721 :
4722 842 : CacheRegisterSyscacheCallback(AUTHOID,
4723 : subscription_change_cb,
4724 : (Datum) 0);
4725 :
4726 842 : if (am_tablesync_worker())
4727 386 : ereport(LOG,
4728 : (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
4729 : MySubscription->name,
4730 : get_rel_name(MyLogicalRepWorker->relid))));
4731 : else
4732 456 : ereport(LOG,
4733 : (errmsg("logical replication apply worker for subscription \"%s\" has started",
4734 : MySubscription->name)));
4735 :
4736 842 : CommitTransactionCommand();
4737 842 : }
4738 :
4739 : /*
4740 : * Reset the origin state.
4741 : */
4742 : static void
4743 934 : replorigin_reset(int code, Datum arg)
4744 : {
4745 934 : replorigin_session_origin = InvalidRepOriginId;
4746 934 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
4747 934 : replorigin_session_origin_timestamp = 0;
4748 934 : }
4749 :
4750 : /* Common function to setup the leader apply or tablesync worker. */
4751 : void
4752 828 : SetupApplyOrSyncWorker(int worker_slot)
4753 : {
4754 : /* Attach to slot */
4755 828 : logicalrep_worker_attach(worker_slot);
4756 :
4757 : Assert(am_tablesync_worker() || am_leader_apply_worker());
4758 :
4759 : /* Setup signal handling */
4760 828 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
4761 828 : pqsignal(SIGTERM, die);
4762 828 : BackgroundWorkerUnblockSignals();
4763 :
4764 : /*
4765 : * We don't currently need any ResourceOwner in a walreceiver process, but
4766 : * if we did, we could call CreateAuxProcessResourceOwner here.
4767 : */
4768 :
4769 : /* Initialise stats to a sanish value */
4770 828 : MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
4771 828 : MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
4772 :
4773 : /* Load the libpq-specific functions */
4774 828 : load_file("libpqwalreceiver", false);
4775 :
4776 828 : InitializeLogRepWorker();
4777 :
4778 : /*
4779 : * Register a callback to reset the origin state before aborting any
4780 : * pending transaction during shutdown (see ShutdownPostgres()). This will
4781 : * avoid origin advancement for an in-complete transaction which could
4782 : * otherwise lead to its loss as such a transaction won't be sent by the
4783 : * server again.
4784 : *
4785 : * Note that even a LOG or DEBUG statement placed after setting the origin
4786 : * state may process a shutdown signal before committing the current apply
4787 : * operation. So, it is important to register such a callback here.
4788 : */
4789 822 : before_shmem_exit(replorigin_reset, (Datum) 0);
4790 :
4791 : /* Connect to the origin and start the replication. */
4792 822 : elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
4793 : MySubscription->conninfo);
4794 :
4795 : /*
4796 : * Setup callback for syscache so that we know when something changes in
4797 : * the subscription relation state.
4798 : */
4799 822 : CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
4800 : invalidate_syncing_table_states,
4801 : (Datum) 0);
4802 822 : }
4803 :
4804 : /* Logical Replication Apply worker entry point */
4805 : void
4806 442 : ApplyWorkerMain(Datum main_arg)
4807 : {
4808 442 : int worker_slot = DatumGetInt32(main_arg);
4809 :
4810 442 : InitializingApplyWorker = true;
4811 :
4812 442 : SetupApplyOrSyncWorker(worker_slot);
4813 :
4814 436 : InitializingApplyWorker = false;
4815 :
4816 436 : run_apply_worker();
4817 :
4818 0 : proc_exit(0);
4819 : }
4820 :
4821 : /*
4822 : * After error recovery, disable the subscription in a new transaction
4823 : * and exit cleanly.
4824 : */
4825 : void
4826 8 : DisableSubscriptionAndExit(void)
4827 : {
4828 : /*
4829 : * Emit the error message, and recover from the error state to an idle
4830 : * state
4831 : */
4832 8 : HOLD_INTERRUPTS();
4833 :
4834 8 : EmitErrorReport();
4835 8 : AbortOutOfAnyTransaction();
4836 8 : FlushErrorState();
4837 :
4838 8 : RESUME_INTERRUPTS();
4839 :
4840 : /* Report the worker failed during either table synchronization or apply */
4841 8 : pgstat_report_subscription_error(MyLogicalRepWorker->subid,
4842 8 : !am_tablesync_worker());
4843 :
4844 : /* Disable the subscription */
4845 8 : StartTransactionCommand();
4846 8 : DisableSubscription(MySubscription->oid);
4847 8 : CommitTransactionCommand();
4848 :
4849 : /* Ensure we remove no-longer-useful entry for worker's start time */
4850 8 : if (am_leader_apply_worker())
4851 6 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
4852 :
4853 : /* Notify the subscription has been disabled and exit */
4854 8 : ereport(LOG,
4855 : errmsg("subscription \"%s\" has been disabled because of an error",
4856 : MySubscription->name));
4857 :
4858 8 : proc_exit(0);
4859 : }
4860 :
4861 : /*
4862 : * Is current process a logical replication worker?
4863 : */
4864 : bool
4865 3962 : IsLogicalWorker(void)
4866 : {
4867 3962 : return MyLogicalRepWorker != NULL;
4868 : }
4869 :
4870 : /*
4871 : * Is current process a logical replication parallel apply worker?
4872 : */
4873 : bool
4874 2758 : IsLogicalParallelApplyWorker(void)
4875 : {
4876 2758 : return IsLogicalWorker() && am_parallel_apply_worker();
4877 : }
4878 :
4879 : /*
4880 : * Start skipping changes of the transaction if the given LSN matches the
4881 : * LSN specified by subscription's skiplsn.
4882 : */
4883 : static void
4884 992 : maybe_start_skipping_changes(XLogRecPtr finish_lsn)
4885 : {
4886 : Assert(!is_skipping_changes());
4887 : Assert(!in_remote_transaction);
4888 : Assert(!in_streamed_transaction);
4889 :
4890 : /*
4891 : * Quick return if it's not requested to skip this transaction. This
4892 : * function is called for every remote transaction and we assume that
4893 : * skipping the transaction is not used often.
4894 : */
4895 992 : if (likely(XLogRecPtrIsInvalid(MySubscription->skiplsn) ||
4896 : MySubscription->skiplsn != finish_lsn))
4897 986 : return;
4898 :
4899 : /* Start skipping all changes of this transaction */
4900 6 : skip_xact_finish_lsn = finish_lsn;
4901 :
4902 6 : ereport(LOG,
4903 : errmsg("logical replication starts skipping transaction at LSN %X/%X",
4904 : LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
4905 : }
4906 :
4907 : /*
4908 : * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
4909 : */
4910 : static void
4911 54 : stop_skipping_changes(void)
4912 : {
4913 54 : if (!is_skipping_changes())
4914 48 : return;
4915 :
4916 6 : ereport(LOG,
4917 : (errmsg("logical replication completed skipping transaction at LSN %X/%X",
4918 : LSN_FORMAT_ARGS(skip_xact_finish_lsn))));
4919 :
4920 : /* Stop skipping changes */
4921 6 : skip_xact_finish_lsn = InvalidXLogRecPtr;
4922 : }
4923 :
4924 : /*
4925 : * Clear subskiplsn of pg_subscription catalog.
4926 : *
4927 : * finish_lsn is the transaction's finish LSN that is used to check if the
4928 : * subskiplsn matches it. If not matched, we raise a warning when clearing the
4929 : * subskiplsn in order to inform users for cases e.g., where the user mistakenly
4930 : * specified the wrong subskiplsn.
4931 : */
4932 : static void
4933 1028 : clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
4934 : {
4935 : Relation rel;
4936 : Form_pg_subscription subform;
4937 : HeapTuple tup;
4938 1028 : XLogRecPtr myskiplsn = MySubscription->skiplsn;
4939 1028 : bool started_tx = false;
4940 :
4941 1028 : if (likely(XLogRecPtrIsInvalid(myskiplsn)) || am_parallel_apply_worker())
4942 1022 : return;
4943 :
4944 6 : if (!IsTransactionState())
4945 : {
4946 2 : StartTransactionCommand();
4947 2 : started_tx = true;
4948 : }
4949 :
4950 : /*
4951 : * Protect subskiplsn of pg_subscription from being concurrently updated
4952 : * while clearing it.
4953 : */
4954 6 : LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
4955 : AccessShareLock);
4956 :
4957 6 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
4958 :
4959 : /* Fetch the existing tuple. */
4960 6 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
4961 : ObjectIdGetDatum(MySubscription->oid));
4962 :
4963 6 : if (!HeapTupleIsValid(tup))
4964 0 : elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
4965 :
4966 6 : subform = (Form_pg_subscription) GETSTRUCT(tup);
4967 :
4968 : /*
4969 : * Clear the subskiplsn. If the user has already changed subskiplsn before
4970 : * clearing it we don't update the catalog and the replication origin
4971 : * state won't get advanced. So in the worst case, if the server crashes
4972 : * before sending an acknowledgment of the flush position the transaction
4973 : * will be sent again and the user needs to set subskiplsn again. We can
4974 : * reduce the possibility by logging a replication origin WAL record to
4975 : * advance the origin LSN instead but there is no way to advance the
4976 : * origin timestamp and it doesn't seem to be worth doing anything about
4977 : * it since it's a very rare case.
4978 : */
4979 6 : if (subform->subskiplsn == myskiplsn)
4980 : {
4981 : bool nulls[Natts_pg_subscription];
4982 : bool replaces[Natts_pg_subscription];
4983 : Datum values[Natts_pg_subscription];
4984 :
4985 6 : memset(values, 0, sizeof(values));
4986 6 : memset(nulls, false, sizeof(nulls));
4987 6 : memset(replaces, false, sizeof(replaces));
4988 :
4989 : /* reset subskiplsn */
4990 6 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
4991 6 : replaces[Anum_pg_subscription_subskiplsn - 1] = true;
4992 :
4993 6 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
4994 : replaces);
4995 6 : CatalogTupleUpdate(rel, &tup->t_self, tup);
4996 :
4997 6 : if (myskiplsn != finish_lsn)
4998 0 : ereport(WARNING,
4999 : errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
5000 : errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X.",
5001 : LSN_FORMAT_ARGS(finish_lsn),
5002 : LSN_FORMAT_ARGS(myskiplsn)));
5003 : }
5004 :
5005 6 : heap_freetuple(tup);
5006 6 : table_close(rel, NoLock);
5007 :
5008 6 : if (started_tx)
5009 2 : CommitTransactionCommand();
5010 : }
5011 :
5012 : /* Error callback to give more context info about the change being applied */
5013 : void
5014 1418 : apply_error_callback(void *arg)
5015 : {
5016 1418 : ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
5017 :
5018 1418 : if (apply_error_callback_arg.command == 0)
5019 690 : return;
5020 :
5021 : Assert(errarg->origin_name);
5022 :
5023 728 : if (errarg->rel == NULL)
5024 : {
5025 642 : if (!TransactionIdIsValid(errarg->remote_xid))
5026 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
5027 : errarg->origin_name,
5028 : logicalrep_message_type(errarg->command));
5029 642 : else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
5030 532 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
5031 : errarg->origin_name,
5032 : logicalrep_message_type(errarg->command),
5033 : errarg->remote_xid);
5034 : else
5035 220 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X",
5036 : errarg->origin_name,
5037 : logicalrep_message_type(errarg->command),
5038 : errarg->remote_xid,
5039 110 : LSN_FORMAT_ARGS(errarg->finish_lsn));
5040 : }
5041 : else
5042 : {
5043 86 : if (errarg->remote_attnum < 0)
5044 : {
5045 86 : if (XLogRecPtrIsInvalid(errarg->finish_lsn))
5046 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
5047 : errarg->origin_name,
5048 : logicalrep_message_type(errarg->command),
5049 0 : errarg->rel->remoterel.nspname,
5050 0 : errarg->rel->remoterel.relname,
5051 : errarg->remote_xid);
5052 : else
5053 172 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
5054 : errarg->origin_name,
5055 : logicalrep_message_type(errarg->command),
5056 86 : errarg->rel->remoterel.nspname,
5057 86 : errarg->rel->remoterel.relname,
5058 : errarg->remote_xid,
5059 86 : LSN_FORMAT_ARGS(errarg->finish_lsn));
5060 : }
5061 : else
5062 : {
5063 0 : if (XLogRecPtrIsInvalid(errarg->finish_lsn))
5064 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
5065 : errarg->origin_name,
5066 : logicalrep_message_type(errarg->command),
5067 0 : errarg->rel->remoterel.nspname,
5068 0 : errarg->rel->remoterel.relname,
5069 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
5070 : errarg->remote_xid);
5071 : else
5072 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
5073 : errarg->origin_name,
5074 : logicalrep_message_type(errarg->command),
5075 0 : errarg->rel->remoterel.nspname,
5076 0 : errarg->rel->remoterel.relname,
5077 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
5078 : errarg->remote_xid,
5079 0 : LSN_FORMAT_ARGS(errarg->finish_lsn));
5080 : }
5081 : }
5082 : }
5083 :
5084 : /* Set transaction information of apply error callback */
5085 : static inline void
5086 5764 : set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
5087 : {
5088 5764 : apply_error_callback_arg.remote_xid = xid;
5089 5764 : apply_error_callback_arg.finish_lsn = lsn;
5090 5764 : }
5091 :
5092 : /* Reset all information of apply error callback */
5093 : static inline void
5094 2846 : reset_apply_error_context_info(void)
5095 : {
5096 2846 : apply_error_callback_arg.command = 0;
5097 2846 : apply_error_callback_arg.rel = NULL;
5098 2846 : apply_error_callback_arg.remote_attnum = -1;
5099 2846 : set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
5100 2846 : }
5101 :
5102 : /*
5103 : * Request wakeup of the workers for the given subscription OID
5104 : * at commit of the current transaction.
5105 : *
5106 : * This is used to ensure that the workers process assorted changes
5107 : * as soon as possible.
5108 : */
5109 : void
5110 386 : LogicalRepWorkersWakeupAtCommit(Oid subid)
5111 : {
5112 : MemoryContext oldcxt;
5113 :
5114 386 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
5115 386 : on_commit_wakeup_workers_subids =
5116 386 : list_append_unique_oid(on_commit_wakeup_workers_subids, subid);
5117 386 : MemoryContextSwitchTo(oldcxt);
5118 386 : }
5119 :
5120 : /*
5121 : * Wake up the workers of any subscriptions that were changed in this xact.
5122 : */
5123 : void
5124 855512 : AtEOXact_LogicalRepWorkers(bool isCommit)
5125 : {
5126 855512 : if (isCommit && on_commit_wakeup_workers_subids != NIL)
5127 : {
5128 : ListCell *lc;
5129 :
5130 376 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
5131 752 : foreach(lc, on_commit_wakeup_workers_subids)
5132 : {
5133 376 : Oid subid = lfirst_oid(lc);
5134 : List *workers;
5135 : ListCell *lc2;
5136 :
5137 376 : workers = logicalrep_workers_find(subid, true, false);
5138 486 : foreach(lc2, workers)
5139 : {
5140 110 : LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
5141 :
5142 110 : logicalrep_worker_wakeup_ptr(worker);
5143 : }
5144 : }
5145 376 : LWLockRelease(LogicalRepWorkerLock);
5146 : }
5147 :
5148 : /* The List storage will be reclaimed automatically in xact cleanup. */
5149 855512 : on_commit_wakeup_workers_subids = NIL;
5150 855512 : }
5151 :
5152 : /*
5153 : * Allocate the origin name in long-lived context for error context message.
5154 : */
5155 : void
5156 760 : set_apply_error_context_origin(char *originname)
5157 : {
5158 760 : apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
5159 : originname);
5160 760 : }
5161 :
5162 : /*
5163 : * Return the action to be taken for the given transaction. See
5164 : * TransApplyAction for information on each of the actions.
5165 : *
5166 : * *winfo is assigned to the destination parallel worker info when the leader
5167 : * apply worker has to pass all the transaction's changes to the parallel
5168 : * apply worker.
5169 : */
5170 : static TransApplyAction
5171 653342 : get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
5172 : {
5173 653342 : *winfo = NULL;
5174 :
5175 653342 : if (am_parallel_apply_worker())
5176 : {
5177 138786 : return TRANS_PARALLEL_APPLY;
5178 : }
5179 :
5180 : /*
5181 : * If we are processing this transaction using a parallel apply worker
5182 : * then either we send the changes to the parallel worker or if the worker
5183 : * is busy then serialize the changes to the file which will later be
5184 : * processed by the parallel worker.
5185 : */
5186 514556 : *winfo = pa_find_worker(xid);
5187 :
5188 514556 : if (*winfo && (*winfo)->serialize_changes)
5189 : {
5190 10074 : return TRANS_LEADER_PARTIAL_SERIALIZE;
5191 : }
5192 504482 : else if (*winfo)
5193 : {
5194 137824 : return TRANS_LEADER_SEND_TO_PARALLEL;
5195 : }
5196 :
5197 : /*
5198 : * If there is no parallel worker involved to process this transaction
5199 : * then we either directly apply the change or serialize it to a file
5200 : * which will later be applied when the transaction finish message is
5201 : * processed.
5202 : */
5203 366658 : else if (in_streamed_transaction)
5204 : {
5205 206396 : return TRANS_LEADER_SERIALIZE;
5206 : }
5207 : else
5208 : {
5209 160262 : return TRANS_LEADER_APPLY;
5210 : }
5211 : }
|