Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * worker.c
3 : * PostgreSQL logical replication worker (apply)
4 : *
5 : * Copyright (c) 2016-2024, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/worker.c
9 : *
10 : * NOTES
11 : * This file contains the worker which applies logical changes as they come
12 : * from remote logical replication stream.
13 : *
14 : * The main worker (apply) is started by logical replication worker
15 : * launcher for every enabled subscription in a database. It uses
16 : * walsender protocol to communicate with publisher.
17 : *
18 : * This module includes server facing code and shares libpqwalreceiver
19 : * module with walreceiver for providing the libpq specific functionality.
20 : *
21 : *
22 : * STREAMED TRANSACTIONS
23 : * ---------------------
24 : * Streamed transactions (large transactions exceeding a memory limit on the
25 : * upstream) are applied using one of two approaches:
26 : *
27 : * 1) Write to temporary files and apply when the final commit arrives
28 : *
29 : * This approach is used when the user has set the subscription's streaming
30 : * option as on.
31 : *
32 : * Unlike the regular (non-streamed) case, handling streamed transactions has
33 : * to handle aborts of both the toplevel transaction and subtransactions. This
34 : * is achieved by tracking offsets for subtransactions, which is then used
35 : * to truncate the file with serialized changes.
36 : *
37 : * The files are placed in tmp file directory by default, and the filenames
38 : * include both the XID of the toplevel transaction and OID of the
39 : * subscription. This is necessary so that different workers processing a
40 : * remote transaction with the same XID doesn't interfere.
41 : *
42 : * We use BufFiles instead of using normal temporary files because (a) the
43 : * BufFile infrastructure supports temporary files that exceed the OS file size
44 : * limit, (b) provides a way for automatic clean up on the error and (c) provides
45 : * a way to survive these files across local transactions and allow to open and
46 : * close at stream start and close. We decided to use FileSet
47 : * infrastructure as without that it deletes the files on the closure of the
48 : * file and if we decide to keep stream files open across the start/stop stream
49 : * then it will consume a lot of memory (more than 8K for each BufFile and
50 : * there could be multiple such BufFiles as the subscriber could receive
51 : * multiple start/stop streams for different transactions before getting the
52 : * commit). Moreover, if we don't use FileSet then we also need to invent
53 : * a new way to pass filenames to BufFile APIs so that we are allowed to open
54 : * the file we desired across multiple stream-open calls for the same
55 : * transaction.
56 : *
57 : * 2) Parallel apply workers.
58 : *
59 : * This approach is used when the user has set the subscription's streaming
60 : * option as parallel. See logical/applyparallelworker.c for information about
61 : * this approach.
62 : *
63 : * TWO_PHASE TRANSACTIONS
64 : * ----------------------
65 : * Two phase transactions are replayed at prepare and then committed or
66 : * rolled back at commit prepared and rollback prepared respectively. It is
67 : * possible to have a prepared transaction that arrives at the apply worker
68 : * when the tablesync is busy doing the initial copy. In this case, the apply
69 : * worker skips all the prepared operations [e.g. inserts] while the tablesync
70 : * is still busy (see the condition of should_apply_changes_for_rel). The
71 : * tablesync worker might not get such a prepared transaction because say it
72 : * was prior to the initial consistent point but might have got some later
73 : * commits. Now, the tablesync worker will exit without doing anything for the
74 : * prepared transaction skipped by the apply worker as the sync location for it
75 : * will be already ahead of the apply worker's current location. This would lead
76 : * to an "empty prepare", because later when the apply worker does the commit
77 : * prepare, there is nothing in it (the inserts were skipped earlier).
78 : *
79 : * To avoid this, and similar prepare confusions the subscription's two_phase
80 : * commit is enabled only after the initial sync is over. The two_phase option
81 : * has been implemented as a tri-state with values DISABLED, PENDING, and
82 : * ENABLED.
83 : *
84 : * Even if the user specifies they want a subscription with two_phase = on,
85 : * internally it will start with a tri-state of PENDING which only becomes
86 : * ENABLED after all tablesync initializations are completed - i.e. when all
87 : * tablesync workers have reached their READY state. In other words, the value
88 : * PENDING is only a temporary state for subscription start-up.
89 : *
90 : * Until the two_phase is properly available (ENABLED) the subscription will
91 : * behave as if two_phase = off. When the apply worker detects that all
92 : * tablesyncs have become READY (while the tri-state was PENDING) it will
93 : * restart the apply worker process. This happens in
94 : * process_syncing_tables_for_apply.
95 : *
96 : * When the (re-started) apply worker finds that all tablesyncs are READY for a
97 : * two_phase tri-state of PENDING it start streaming messages with the
98 : * two_phase option which in turn enables the decoding of two-phase commits at
99 : * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
100 : * Now, it is possible that during the time we have not enabled two_phase, the
101 : * publisher (replication server) would have skipped some prepares but we
102 : * ensure that such prepares are sent along with commit prepare, see
103 : * ReorderBufferFinishPrepared.
104 : *
105 : * If the subscription has no tables then a two_phase tri-state PENDING is
106 : * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
107 : * PUBLICATION which might otherwise be disallowed (see below).
108 : *
109 : * If ever a user needs to be aware of the tri-state value, they can fetch it
110 : * from the pg_subscription catalog (see column subtwophasestate).
111 : *
112 : * We don't allow to toggle two_phase option of a subscription because it can
113 : * lead to an inconsistent replica. Consider, initially, it was on and we have
114 : * received some prepare then we turn it off, now at commit time the server
115 : * will send the entire transaction data along with the commit. With some more
116 : * analysis, we can allow changing this option from off to on but not sure if
117 : * that alone would be useful.
118 : *
119 : * Finally, to avoid problems mentioned in previous paragraphs from any
120 : * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
121 : * to 'off' and then again back to 'on') there is a restriction for
122 : * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
123 : * the two_phase tri-state is ENABLED, except when copy_data = false.
124 : *
125 : * We can get prepare of the same GID more than once for the genuine cases
126 : * where we have defined multiple subscriptions for publications on the same
127 : * server and prepared transaction has operations on tables subscribed to those
128 : * subscriptions. For such cases, if we use the GID sent by publisher one of
129 : * the prepares will be successful and others will fail, in which case the
130 : * server will send them again. Now, this can lead to a deadlock if user has
131 : * set synchronous_standby_names for all the subscriptions on subscriber. To
132 : * avoid such deadlocks, we generate a unique GID (consisting of the
133 : * subscription oid and the xid of the prepared transaction) for each prepare
134 : * transaction on the subscriber.
135 : *
136 : * FAILOVER
137 : * ----------------------
138 : * The logical slot on the primary can be synced to the standby by specifying
139 : * failover = true when creating the subscription. Enabling failover allows us
140 : * to smoothly transition to the promoted standby, ensuring that we can
141 : * subscribe to the new primary without losing any data.
142 : *-------------------------------------------------------------------------
143 : */
144 :
145 : #include "postgres.h"
146 :
147 : #include <sys/stat.h>
148 : #include <unistd.h>
149 :
150 : #include "access/table.h"
151 : #include "access/tableam.h"
152 : #include "access/twophase.h"
153 : #include "access/xact.h"
154 : #include "catalog/indexing.h"
155 : #include "catalog/pg_inherits.h"
156 : #include "catalog/pg_subscription.h"
157 : #include "catalog/pg_subscription_rel.h"
158 : #include "commands/tablecmds.h"
159 : #include "commands/trigger.h"
160 : #include "executor/executor.h"
161 : #include "executor/execPartition.h"
162 : #include "libpq/pqformat.h"
163 : #include "miscadmin.h"
164 : #include "optimizer/optimizer.h"
165 : #include "parser/parse_relation.h"
166 : #include "pgstat.h"
167 : #include "postmaster/bgworker.h"
168 : #include "postmaster/interrupt.h"
169 : #include "postmaster/walwriter.h"
170 : #include "replication/conflict.h"
171 : #include "replication/logicallauncher.h"
172 : #include "replication/logicalproto.h"
173 : #include "replication/logicalrelation.h"
174 : #include "replication/logicalworker.h"
175 : #include "replication/origin.h"
176 : #include "replication/walreceiver.h"
177 : #include "replication/worker_internal.h"
178 : #include "rewrite/rewriteHandler.h"
179 : #include "storage/buffile.h"
180 : #include "storage/ipc.h"
181 : #include "storage/lmgr.h"
182 : #include "tcop/tcopprot.h"
183 : #include "utils/acl.h"
184 : #include "utils/dynahash.h"
185 : #include "utils/guc.h"
186 : #include "utils/inval.h"
187 : #include "utils/lsyscache.h"
188 : #include "utils/memutils.h"
189 : #include "utils/pg_lsn.h"
190 : #include "utils/rel.h"
191 : #include "utils/rls.h"
192 : #include "utils/snapmgr.h"
193 : #include "utils/syscache.h"
194 : #include "utils/usercontext.h"
195 :
196 : #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
197 :
198 : typedef struct FlushPosition
199 : {
200 : dlist_node node;
201 : XLogRecPtr local_end;
202 : XLogRecPtr remote_end;
203 : } FlushPosition;
204 :
205 : static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
206 :
207 : typedef struct ApplyExecutionData
208 : {
209 : EState *estate; /* executor state, used to track resources */
210 :
211 : LogicalRepRelMapEntry *targetRel; /* replication target rel */
212 : ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
213 :
214 : /* These fields are used when the target relation is partitioned: */
215 : ModifyTableState *mtstate; /* dummy ModifyTable state */
216 : PartitionTupleRouting *proute; /* partition routing info */
217 : } ApplyExecutionData;
218 :
219 : /* Struct for saving and restoring apply errcontext information */
220 : typedef struct ApplyErrorCallbackArg
221 : {
222 : LogicalRepMsgType command; /* 0 if invalid */
223 : LogicalRepRelMapEntry *rel;
224 :
225 : /* Remote node information */
226 : int remote_attnum; /* -1 if invalid */
227 : TransactionId remote_xid;
228 : XLogRecPtr finish_lsn;
229 : char *origin_name;
230 : } ApplyErrorCallbackArg;
231 :
232 : /*
233 : * The action to be taken for the changes in the transaction.
234 : *
235 : * TRANS_LEADER_APPLY:
236 : * This action means that we are in the leader apply worker or table sync
237 : * worker. The changes of the transaction are either directly applied or
238 : * are read from temporary files (for streaming transactions) and then
239 : * applied by the worker.
240 : *
241 : * TRANS_LEADER_SERIALIZE:
242 : * This action means that we are in the leader apply worker or table sync
243 : * worker. Changes are written to temporary files and then applied when the
244 : * final commit arrives.
245 : *
246 : * TRANS_LEADER_SEND_TO_PARALLEL:
247 : * This action means that we are in the leader apply worker and need to send
248 : * the changes to the parallel apply worker.
249 : *
250 : * TRANS_LEADER_PARTIAL_SERIALIZE:
251 : * This action means that we are in the leader apply worker and have sent some
252 : * changes directly to the parallel apply worker and the remaining changes are
253 : * serialized to a file, due to timeout while sending data. The parallel apply
254 : * worker will apply these serialized changes when the final commit arrives.
255 : *
256 : * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
257 : * serializing changes, the leader worker also needs to serialize the
258 : * STREAM_XXX message to a file, and wait for the parallel apply worker to
259 : * finish the transaction when processing the transaction finish command. So
260 : * this new action was introduced to keep the code and logic clear.
261 : *
262 : * TRANS_PARALLEL_APPLY:
263 : * This action means that we are in the parallel apply worker and changes of
264 : * the transaction are applied directly by the worker.
265 : */
266 : typedef enum
267 : {
268 : /* The action for non-streaming transactions. */
269 : TRANS_LEADER_APPLY,
270 :
271 : /* Actions for streaming transactions. */
272 : TRANS_LEADER_SERIALIZE,
273 : TRANS_LEADER_SEND_TO_PARALLEL,
274 : TRANS_LEADER_PARTIAL_SERIALIZE,
275 : TRANS_PARALLEL_APPLY,
276 : } TransApplyAction;
277 :
278 : /* errcontext tracker */
279 : static ApplyErrorCallbackArg apply_error_callback_arg =
280 : {
281 : .command = 0,
282 : .rel = NULL,
283 : .remote_attnum = -1,
284 : .remote_xid = InvalidTransactionId,
285 : .finish_lsn = InvalidXLogRecPtr,
286 : .origin_name = NULL,
287 : };
288 :
289 : ErrorContextCallback *apply_error_context_stack = NULL;
290 :
291 : MemoryContext ApplyMessageContext = NULL;
292 : MemoryContext ApplyContext = NULL;
293 :
294 : /* per stream context for streaming transactions */
295 : static MemoryContext LogicalStreamingContext = NULL;
296 :
297 : WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
298 :
299 : Subscription *MySubscription = NULL;
300 : static bool MySubscriptionValid = false;
301 :
302 : static List *on_commit_wakeup_workers_subids = NIL;
303 :
304 : bool in_remote_transaction = false;
305 : static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
306 :
307 : /* fields valid only when processing streamed transaction */
308 : static bool in_streamed_transaction = false;
309 :
310 : static TransactionId stream_xid = InvalidTransactionId;
311 :
312 : /*
313 : * The number of changes applied by parallel apply worker during one streaming
314 : * block.
315 : */
316 : static uint32 parallel_stream_nchanges = 0;
317 :
318 : /* Are we initializing an apply worker? */
319 : bool InitializingApplyWorker = false;
320 :
321 : /*
322 : * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
323 : * the subscription if the remote transaction's finish LSN matches the subskiplsn.
324 : * Once we start skipping changes, we don't stop it until we skip all changes of
325 : * the transaction even if pg_subscription is updated and MySubscription->skiplsn
326 : * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
327 : * we don't skip receiving and spooling the changes since we decide whether or not
328 : * to skip applying the changes when starting to apply changes. The subskiplsn is
329 : * cleared after successfully skipping the transaction or applying non-empty
330 : * transaction. The latter prevents the mistakenly specified subskiplsn from
331 : * being left. Note that we cannot skip the streaming transactions when using
332 : * parallel apply workers because we cannot get the finish LSN before applying
333 : * the changes. So, we don't start parallel apply worker when finish LSN is set
334 : * by the user.
335 : */
336 : static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
337 : #define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
338 :
339 : /* BufFile handle of the current streaming file */
340 : static BufFile *stream_fd = NULL;
341 :
342 : typedef struct SubXactInfo
343 : {
344 : TransactionId xid; /* XID of the subxact */
345 : int fileno; /* file number in the buffile */
346 : off_t offset; /* offset in the file */
347 : } SubXactInfo;
348 :
349 : /* Sub-transaction data for the current streaming transaction */
350 : typedef struct ApplySubXactData
351 : {
352 : uint32 nsubxacts; /* number of sub-transactions */
353 : uint32 nsubxacts_max; /* current capacity of subxacts */
354 : TransactionId subxact_last; /* xid of the last sub-transaction */
355 : SubXactInfo *subxacts; /* sub-xact offset in changes file */
356 : } ApplySubXactData;
357 :
358 : static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
359 :
360 : static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
361 : static inline void changes_filename(char *path, Oid subid, TransactionId xid);
362 :
363 : /*
364 : * Information about subtransactions of a given toplevel transaction.
365 : */
366 : static void subxact_info_write(Oid subid, TransactionId xid);
367 : static void subxact_info_read(Oid subid, TransactionId xid);
368 : static void subxact_info_add(TransactionId xid);
369 : static inline void cleanup_subxact_info(void);
370 :
371 : /*
372 : * Serialize and deserialize changes for a toplevel transaction.
373 : */
374 : static void stream_open_file(Oid subid, TransactionId xid,
375 : bool first_segment);
376 : static void stream_write_change(char action, StringInfo s);
377 : static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
378 : static void stream_close_file(void);
379 :
380 : static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
381 :
382 : static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
383 : static void apply_handle_insert_internal(ApplyExecutionData *edata,
384 : ResultRelInfo *relinfo,
385 : TupleTableSlot *remoteslot);
386 : static void apply_handle_update_internal(ApplyExecutionData *edata,
387 : ResultRelInfo *relinfo,
388 : TupleTableSlot *remoteslot,
389 : LogicalRepTupleData *newtup,
390 : Oid localindexoid);
391 : static void apply_handle_delete_internal(ApplyExecutionData *edata,
392 : ResultRelInfo *relinfo,
393 : TupleTableSlot *remoteslot,
394 : Oid localindexoid);
395 : static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
396 : LogicalRepRelation *remoterel,
397 : Oid localidxoid,
398 : TupleTableSlot *remoteslot,
399 : TupleTableSlot **localslot);
400 : static void apply_handle_tuple_routing(ApplyExecutionData *edata,
401 : TupleTableSlot *remoteslot,
402 : LogicalRepTupleData *newtup,
403 : CmdType operation);
404 :
405 : /* Functions for skipping changes */
406 : static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
407 : static void stop_skipping_changes(void);
408 : static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
409 :
410 : /* Functions for apply error callback */
411 : static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
412 : static inline void reset_apply_error_context_info(void);
413 :
414 : static TransApplyAction get_transaction_apply_action(TransactionId xid,
415 : ParallelApplyWorkerInfo **winfo);
416 :
417 : /*
418 : * Form the origin name for the subscription.
419 : *
420 : * This is a common function for tablesync and other workers. Tablesync workers
421 : * must pass a valid relid. Other callers must pass relid = InvalidOid.
422 : *
423 : * Return the name in the supplied buffer.
424 : */
425 : void
426 2352 : ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
427 : char *originname, Size szoriginname)
428 : {
429 2352 : if (OidIsValid(relid))
430 : {
431 : /* Replication origin name for tablesync workers. */
432 1414 : snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
433 : }
434 : else
435 : {
436 : /* Replication origin name for non-tablesync workers. */
437 938 : snprintf(originname, szoriginname, "pg_%u", suboid);
438 : }
439 2352 : }
440 :
441 : /*
442 : * Should this worker apply changes for given relation.
443 : *
444 : * This is mainly needed for initial relation data sync as that runs in
445 : * separate worker process running in parallel and we need some way to skip
446 : * changes coming to the leader apply worker during the sync of a table.
447 : *
448 : * Note we need to do smaller or equals comparison for SYNCDONE state because
449 : * it might hold position of end of initial slot consistent point WAL
450 : * record + 1 (ie start of next record) and next record can be COMMIT of
451 : * transaction we are now processing (which is what we set remote_final_lsn
452 : * to in apply_handle_begin).
453 : *
454 : * Note that for streaming transactions that are being applied in the parallel
455 : * apply worker, we disallow applying changes if the target table in the
456 : * subscription is not in the READY state, because we cannot decide whether to
457 : * apply the change as we won't know remote_final_lsn by that time.
458 : *
459 : * We already checked this in pa_can_start() before assigning the
460 : * streaming transaction to the parallel worker, but it also needs to be
461 : * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
462 : * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
463 : * while applying this transaction.
464 : */
465 : static bool
466 297538 : should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
467 : {
468 297538 : switch (MyLogicalRepWorker->type)
469 : {
470 0 : case WORKERTYPE_TABLESYNC:
471 0 : return MyLogicalRepWorker->relid == rel->localreloid;
472 :
473 138094 : case WORKERTYPE_PARALLEL_APPLY:
474 : /* We don't synchronize rel's that are in unknown state. */
475 138094 : if (rel->state != SUBREL_STATE_READY &&
476 0 : rel->state != SUBREL_STATE_UNKNOWN)
477 0 : ereport(ERROR,
478 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
479 : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
480 : MySubscription->name),
481 : errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
482 :
483 138094 : return rel->state == SUBREL_STATE_READY;
484 :
485 159444 : case WORKERTYPE_APPLY:
486 159550 : return (rel->state == SUBREL_STATE_READY ||
487 106 : (rel->state == SUBREL_STATE_SYNCDONE &&
488 2 : rel->statelsn <= remote_final_lsn));
489 :
490 0 : case WORKERTYPE_UNKNOWN:
491 : /* Should never happen. */
492 0 : elog(ERROR, "Unknown worker type");
493 : }
494 :
495 0 : return false; /* dummy for compiler */
496 : }
497 :
498 : /*
499 : * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
500 : *
501 : * Start a transaction, if this is the first step (else we keep using the
502 : * existing transaction).
503 : * Also provide a global snapshot and ensure we run in ApplyMessageContext.
504 : */
505 : static void
506 298494 : begin_replication_step(void)
507 : {
508 298494 : SetCurrentStatementStartTimestamp();
509 :
510 298494 : if (!IsTransactionState())
511 : {
512 1882 : StartTransactionCommand();
513 1882 : maybe_reread_subscription();
514 : }
515 :
516 298490 : PushActiveSnapshot(GetTransactionSnapshot());
517 :
518 298490 : MemoryContextSwitchTo(ApplyMessageContext);
519 298490 : }
520 :
521 : /*
522 : * Finish up one step of a replication transaction.
523 : * Callers of begin_replication_step() must also call this.
524 : *
525 : * We don't close out the transaction here, but we should increment
526 : * the command counter to make the effects of this step visible.
527 : */
528 : static void
529 298438 : end_replication_step(void)
530 : {
531 298438 : PopActiveSnapshot();
532 :
533 298438 : CommandCounterIncrement();
534 298438 : }
535 :
536 : /*
537 : * Handle streamed transactions for both the leader apply worker and the
538 : * parallel apply workers.
539 : *
540 : * In the streaming case (receiving a block of the streamed transaction), for
541 : * serialize mode, simply redirect it to a file for the proper toplevel
542 : * transaction, and for parallel mode, the leader apply worker will send the
543 : * changes to parallel apply workers and the parallel apply worker will define
544 : * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
545 : * messages will be applied by both leader apply worker and parallel apply
546 : * workers).
547 : *
548 : * Returns true for streamed transactions (when the change is either serialized
549 : * to file or sent to parallel apply worker), false otherwise (regular mode or
550 : * needs to be processed by parallel apply worker).
551 : *
552 : * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
553 : * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
554 : * to a parallel apply worker.
555 : */
556 : static bool
557 650116 : handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
558 : {
559 : TransactionId current_xid;
560 : ParallelApplyWorkerInfo *winfo;
561 : TransApplyAction apply_action;
562 : StringInfoData original_msg;
563 :
564 650116 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
565 :
566 : /* not in streaming mode */
567 650116 : if (apply_action == TRANS_LEADER_APPLY)
568 160158 : return false;
569 :
570 : Assert(TransactionIdIsValid(stream_xid));
571 :
572 : /*
573 : * The parallel apply worker needs the xid in this message to decide
574 : * whether to define a savepoint, so save the original message that has
575 : * not moved the cursor after the xid. We will serialize this message to a
576 : * file in PARTIAL_SERIALIZE mode.
577 : */
578 489958 : original_msg = *s;
579 :
580 : /*
581 : * We should have received XID of the subxact as the first part of the
582 : * message, so extract it.
583 : */
584 489958 : current_xid = pq_getmsgint(s, 4);
585 :
586 489958 : if (!TransactionIdIsValid(current_xid))
587 0 : ereport(ERROR,
588 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
589 : errmsg_internal("invalid transaction ID in streamed replication transaction")));
590 :
591 489958 : switch (apply_action)
592 : {
593 205024 : case TRANS_LEADER_SERIALIZE:
594 : Assert(stream_fd);
595 :
596 : /* Add the new subxact to the array (unless already there). */
597 205024 : subxact_info_add(current_xid);
598 :
599 : /* Write the change to the current file */
600 205024 : stream_write_change(action, s);
601 205024 : return true;
602 :
603 136772 : case TRANS_LEADER_SEND_TO_PARALLEL:
604 : Assert(winfo);
605 :
606 : /*
607 : * XXX The publisher side doesn't always send relation/type update
608 : * messages after the streaming transaction, so also update the
609 : * relation/type in leader apply worker. See function
610 : * cleanup_rel_sync_cache.
611 : */
612 136772 : if (pa_send_data(winfo, s->len, s->data))
613 136772 : return (action != LOGICAL_REP_MSG_RELATION &&
614 : action != LOGICAL_REP_MSG_TYPE);
615 :
616 : /*
617 : * Switch to serialize mode when we are not able to send the
618 : * change to parallel apply worker.
619 : */
620 0 : pa_switch_to_partial_serialize(winfo, false);
621 :
622 : /* fall through */
623 10012 : case TRANS_LEADER_PARTIAL_SERIALIZE:
624 10012 : stream_write_change(action, &original_msg);
625 :
626 : /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
627 10012 : return (action != LOGICAL_REP_MSG_RELATION &&
628 : action != LOGICAL_REP_MSG_TYPE);
629 :
630 138150 : case TRANS_PARALLEL_APPLY:
631 138150 : parallel_stream_nchanges += 1;
632 :
633 : /* Define a savepoint for a subxact if needed. */
634 138150 : pa_start_subtrans(current_xid, stream_xid);
635 138150 : return false;
636 :
637 0 : default:
638 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
639 : return false; /* silence compiler warning */
640 : }
641 : }
642 :
643 : /*
644 : * Executor state preparation for evaluation of constraint expressions,
645 : * indexes and triggers for the specified relation.
646 : *
647 : * Note that the caller must open and close any indexes to be updated.
648 : */
649 : static ApplyExecutionData *
650 297376 : create_edata_for_relation(LogicalRepRelMapEntry *rel)
651 : {
652 : ApplyExecutionData *edata;
653 : EState *estate;
654 : RangeTblEntry *rte;
655 297376 : List *perminfos = NIL;
656 : ResultRelInfo *resultRelInfo;
657 :
658 297376 : edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
659 297376 : edata->targetRel = rel;
660 :
661 297376 : edata->estate = estate = CreateExecutorState();
662 :
663 297376 : rte = makeNode(RangeTblEntry);
664 297376 : rte->rtekind = RTE_RELATION;
665 297376 : rte->relid = RelationGetRelid(rel->localrel);
666 297376 : rte->relkind = rel->localrel->rd_rel->relkind;
667 297376 : rte->rellockmode = AccessShareLock;
668 :
669 297376 : addRTEPermissionInfo(&perminfos, rte);
670 :
671 297376 : ExecInitRangeTable(estate, list_make1(rte), perminfos);
672 :
673 297376 : edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
674 :
675 : /*
676 : * Use Relation opened by logicalrep_rel_open() instead of opening it
677 : * again.
678 : */
679 297376 : InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
680 :
681 : /*
682 : * We put the ResultRelInfo in the es_opened_result_relations list, even
683 : * though we don't populate the es_result_relations array. That's a bit
684 : * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
685 : *
686 : * ExecOpenIndices() is not called here either, each execution path doing
687 : * an apply operation being responsible for that.
688 : */
689 297376 : estate->es_opened_result_relations =
690 297376 : lappend(estate->es_opened_result_relations, resultRelInfo);
691 :
692 297376 : estate->es_output_cid = GetCurrentCommandId(true);
693 :
694 : /* Prepare to catch AFTER triggers. */
695 297376 : AfterTriggerBeginQuery();
696 :
697 : /* other fields of edata remain NULL for now */
698 :
699 297376 : return edata;
700 : }
701 :
702 : /*
703 : * Finish any operations related to the executor state created by
704 : * create_edata_for_relation().
705 : */
706 : static void
707 297340 : finish_edata(ApplyExecutionData *edata)
708 : {
709 297340 : EState *estate = edata->estate;
710 :
711 : /* Handle any queued AFTER triggers. */
712 297340 : AfterTriggerEndQuery(estate);
713 :
714 : /* Shut down tuple routing, if any was done. */
715 297340 : if (edata->proute)
716 148 : ExecCleanupTupleRouting(edata->mtstate, edata->proute);
717 :
718 : /*
719 : * Cleanup. It might seem that we should call ExecCloseResultRelations()
720 : * here, but we intentionally don't. It would close the rel we added to
721 : * es_opened_result_relations above, which is wrong because we took no
722 : * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
723 : * any other relations opened during execution.
724 : */
725 297340 : ExecResetTupleTable(estate->es_tupleTable, false);
726 297340 : FreeExecutorState(estate);
727 297340 : pfree(edata);
728 297340 : }
729 :
730 : /*
731 : * Executes default values for columns for which we can't map to remote
732 : * relation columns.
733 : *
734 : * This allows us to support tables which have more columns on the downstream
735 : * than on the upstream.
736 : */
737 : static void
738 152876 : slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
739 : TupleTableSlot *slot)
740 : {
741 152876 : TupleDesc desc = RelationGetDescr(rel->localrel);
742 152876 : int num_phys_attrs = desc->natts;
743 : int i;
744 : int attnum,
745 152876 : num_defaults = 0;
746 : int *defmap;
747 : ExprState **defexprs;
748 : ExprContext *econtext;
749 :
750 152876 : econtext = GetPerTupleExprContext(estate);
751 :
752 : /* We got all the data via replication, no need to evaluate anything. */
753 152876 : if (num_phys_attrs == rel->remoterel.natts)
754 72588 : return;
755 :
756 80288 : defmap = (int *) palloc(num_phys_attrs * sizeof(int));
757 80288 : defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
758 :
759 : Assert(rel->attrmap->maplen == num_phys_attrs);
760 421310 : for (attnum = 0; attnum < num_phys_attrs; attnum++)
761 : {
762 : Expr *defexpr;
763 :
764 341022 : if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
765 8 : continue;
766 :
767 341014 : if (rel->attrmap->attnums[attnum] >= 0)
768 184532 : continue;
769 :
770 156482 : defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
771 :
772 156482 : if (defexpr != NULL)
773 : {
774 : /* Run the expression through planner */
775 140262 : defexpr = expression_planner(defexpr);
776 :
777 : /* Initialize executable expression in copycontext */
778 140262 : defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
779 140262 : defmap[num_defaults] = attnum;
780 140262 : num_defaults++;
781 : }
782 : }
783 :
784 220550 : for (i = 0; i < num_defaults; i++)
785 140262 : slot->tts_values[defmap[i]] =
786 140262 : ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
787 : }
788 :
789 : /*
790 : * Store tuple data into slot.
791 : *
792 : * Incoming data can be either text or binary format.
793 : */
794 : static void
795 297392 : slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
796 : LogicalRepTupleData *tupleData)
797 : {
798 297392 : int natts = slot->tts_tupleDescriptor->natts;
799 : int i;
800 :
801 297392 : ExecClearTuple(slot);
802 :
803 : /* Call the "in" function for each non-dropped, non-null attribute */
804 : Assert(natts == rel->attrmap->maplen);
805 1317670 : for (i = 0; i < natts; i++)
806 : {
807 1020278 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
808 1020278 : int remoteattnum = rel->attrmap->attnums[i];
809 :
810 1020278 : if (!att->attisdropped && remoteattnum >= 0)
811 606458 : {
812 606458 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
813 :
814 : Assert(remoteattnum < tupleData->ncols);
815 :
816 : /* Set attnum for error callback */
817 606458 : apply_error_callback_arg.remote_attnum = remoteattnum;
818 :
819 606458 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
820 : {
821 : Oid typinput;
822 : Oid typioparam;
823 :
824 284492 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
825 568984 : slot->tts_values[i] =
826 284492 : OidInputFunctionCall(typinput, colvalue->data,
827 : typioparam, att->atttypmod);
828 284492 : slot->tts_isnull[i] = false;
829 : }
830 321966 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
831 : {
832 : Oid typreceive;
833 : Oid typioparam;
834 :
835 : /*
836 : * In some code paths we may be asked to re-parse the same
837 : * tuple data. Reset the StringInfo's cursor so that works.
838 : */
839 221324 : colvalue->cursor = 0;
840 :
841 221324 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
842 442648 : slot->tts_values[i] =
843 221324 : OidReceiveFunctionCall(typreceive, colvalue,
844 : typioparam, att->atttypmod);
845 :
846 : /* Trouble if it didn't eat the whole buffer */
847 221324 : if (colvalue->cursor != colvalue->len)
848 0 : ereport(ERROR,
849 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
850 : errmsg("incorrect binary data format in logical replication column %d",
851 : remoteattnum + 1)));
852 221324 : slot->tts_isnull[i] = false;
853 : }
854 : else
855 : {
856 : /*
857 : * NULL value from remote. (We don't expect to see
858 : * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
859 : * NULL.)
860 : */
861 100642 : slot->tts_values[i] = (Datum) 0;
862 100642 : slot->tts_isnull[i] = true;
863 : }
864 :
865 : /* Reset attnum for error callback */
866 606458 : apply_error_callback_arg.remote_attnum = -1;
867 : }
868 : else
869 : {
870 : /*
871 : * We assign NULL to dropped attributes and missing values
872 : * (missing values should be later filled using
873 : * slot_fill_defaults).
874 : */
875 413820 : slot->tts_values[i] = (Datum) 0;
876 413820 : slot->tts_isnull[i] = true;
877 : }
878 : }
879 :
880 297392 : ExecStoreVirtualTuple(slot);
881 297392 : }
882 :
883 : /*
884 : * Replace updated columns with data from the LogicalRepTupleData struct.
885 : * This is somewhat similar to heap_modify_tuple but also calls the type
886 : * input functions on the user data.
887 : *
888 : * "slot" is filled with a copy of the tuple in "srcslot", replacing
889 : * columns provided in "tupleData" and leaving others as-is.
890 : *
891 : * Caution: unreplaced pass-by-ref columns in "slot" will point into the
892 : * storage for "srcslot". This is OK for current usage, but someday we may
893 : * need to materialize "slot" at the end to make it independent of "srcslot".
894 : */
895 : static void
896 63846 : slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
897 : LogicalRepRelMapEntry *rel,
898 : LogicalRepTupleData *tupleData)
899 : {
900 63846 : int natts = slot->tts_tupleDescriptor->natts;
901 : int i;
902 :
903 : /* We'll fill "slot" with a virtual tuple, so we must start with ... */
904 63846 : ExecClearTuple(slot);
905 :
906 : /*
907 : * Copy all the column data from srcslot, so that we'll have valid values
908 : * for unreplaced columns.
909 : */
910 : Assert(natts == srcslot->tts_tupleDescriptor->natts);
911 63846 : slot_getallattrs(srcslot);
912 63846 : memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
913 63846 : memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
914 :
915 : /* Call the "in" function for each replaced attribute */
916 : Assert(natts == rel->attrmap->maplen);
917 318548 : for (i = 0; i < natts; i++)
918 : {
919 254702 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
920 254702 : int remoteattnum = rel->attrmap->attnums[i];
921 :
922 254702 : if (remoteattnum < 0)
923 117034 : continue;
924 :
925 : Assert(remoteattnum < tupleData->ncols);
926 :
927 137668 : if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
928 : {
929 137662 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
930 :
931 : /* Set attnum for error callback */
932 137662 : apply_error_callback_arg.remote_attnum = remoteattnum;
933 :
934 137662 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
935 : {
936 : Oid typinput;
937 : Oid typioparam;
938 :
939 50854 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
940 101708 : slot->tts_values[i] =
941 50854 : OidInputFunctionCall(typinput, colvalue->data,
942 : typioparam, att->atttypmod);
943 50854 : slot->tts_isnull[i] = false;
944 : }
945 86808 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
946 : {
947 : Oid typreceive;
948 : Oid typioparam;
949 :
950 : /*
951 : * In some code paths we may be asked to re-parse the same
952 : * tuple data. Reset the StringInfo's cursor so that works.
953 : */
954 86712 : colvalue->cursor = 0;
955 :
956 86712 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
957 173424 : slot->tts_values[i] =
958 86712 : OidReceiveFunctionCall(typreceive, colvalue,
959 : typioparam, att->atttypmod);
960 :
961 : /* Trouble if it didn't eat the whole buffer */
962 86712 : if (colvalue->cursor != colvalue->len)
963 0 : ereport(ERROR,
964 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
965 : errmsg("incorrect binary data format in logical replication column %d",
966 : remoteattnum + 1)));
967 86712 : slot->tts_isnull[i] = false;
968 : }
969 : else
970 : {
971 : /* must be LOGICALREP_COLUMN_NULL */
972 96 : slot->tts_values[i] = (Datum) 0;
973 96 : slot->tts_isnull[i] = true;
974 : }
975 :
976 : /* Reset attnum for error callback */
977 137662 : apply_error_callback_arg.remote_attnum = -1;
978 : }
979 : }
980 :
981 : /* And finally, declare that "slot" contains a valid virtual tuple */
982 63846 : ExecStoreVirtualTuple(slot);
983 63846 : }
984 :
985 : /*
986 : * Handle BEGIN message.
987 : */
988 : static void
989 886 : apply_handle_begin(StringInfo s)
990 : {
991 : LogicalRepBeginData begin_data;
992 :
993 : /* There must not be an active streaming transaction. */
994 : Assert(!TransactionIdIsValid(stream_xid));
995 :
996 886 : logicalrep_read_begin(s, &begin_data);
997 886 : set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
998 :
999 886 : remote_final_lsn = begin_data.final_lsn;
1000 :
1001 886 : maybe_start_skipping_changes(begin_data.final_lsn);
1002 :
1003 886 : in_remote_transaction = true;
1004 :
1005 886 : pgstat_report_activity(STATE_RUNNING, NULL);
1006 886 : }
1007 :
1008 : /*
1009 : * Handle COMMIT message.
1010 : *
1011 : * TODO, support tracking of multiple origins
1012 : */
1013 : static void
1014 834 : apply_handle_commit(StringInfo s)
1015 : {
1016 : LogicalRepCommitData commit_data;
1017 :
1018 834 : logicalrep_read_commit(s, &commit_data);
1019 :
1020 834 : if (commit_data.commit_lsn != remote_final_lsn)
1021 0 : ereport(ERROR,
1022 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1023 : errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
1024 : LSN_FORMAT_ARGS(commit_data.commit_lsn),
1025 : LSN_FORMAT_ARGS(remote_final_lsn))));
1026 :
1027 834 : apply_handle_commit_internal(&commit_data);
1028 :
1029 : /* Process any tables that are being synchronized in parallel. */
1030 834 : process_syncing_tables(commit_data.end_lsn);
1031 :
1032 834 : pgstat_report_activity(STATE_IDLE, NULL);
1033 834 : reset_apply_error_context_info();
1034 834 : }
1035 :
1036 : /*
1037 : * Handle BEGIN PREPARE message.
1038 : */
1039 : static void
1040 32 : apply_handle_begin_prepare(StringInfo s)
1041 : {
1042 : LogicalRepPreparedTxnData begin_data;
1043 :
1044 : /* Tablesync should never receive prepare. */
1045 32 : if (am_tablesync_worker())
1046 0 : ereport(ERROR,
1047 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1048 : errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1049 :
1050 : /* There must not be an active streaming transaction. */
1051 : Assert(!TransactionIdIsValid(stream_xid));
1052 :
1053 32 : logicalrep_read_begin_prepare(s, &begin_data);
1054 32 : set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
1055 :
1056 32 : remote_final_lsn = begin_data.prepare_lsn;
1057 :
1058 32 : maybe_start_skipping_changes(begin_data.prepare_lsn);
1059 :
1060 32 : in_remote_transaction = true;
1061 :
1062 32 : pgstat_report_activity(STATE_RUNNING, NULL);
1063 32 : }
1064 :
1065 : /*
1066 : * Common function to prepare the GID.
1067 : */
1068 : static void
1069 46 : apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
1070 : {
1071 : char gid[GIDSIZE];
1072 :
1073 : /*
1074 : * Compute unique GID for two_phase transactions. We don't use GID of
1075 : * prepared transaction sent by server as that can lead to deadlock when
1076 : * we have multiple subscriptions from same node point to publications on
1077 : * the same node. See comments atop worker.c
1078 : */
1079 46 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
1080 : gid, sizeof(gid));
1081 :
1082 : /*
1083 : * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1084 : * called within the PrepareTransactionBlock below.
1085 : */
1086 46 : if (!IsTransactionBlock())
1087 : {
1088 46 : BeginTransactionBlock();
1089 46 : CommitTransactionCommand(); /* Completes the preceding Begin command. */
1090 : }
1091 :
1092 : /*
1093 : * Update origin state so we can restart streaming from correct position
1094 : * in case of crash.
1095 : */
1096 46 : replorigin_session_origin_lsn = prepare_data->end_lsn;
1097 46 : replorigin_session_origin_timestamp = prepare_data->prepare_time;
1098 :
1099 46 : PrepareTransactionBlock(gid);
1100 46 : }
1101 :
1102 : /*
1103 : * Handle PREPARE message.
1104 : */
1105 : static void
1106 30 : apply_handle_prepare(StringInfo s)
1107 : {
1108 : LogicalRepPreparedTxnData prepare_data;
1109 :
1110 30 : logicalrep_read_prepare(s, &prepare_data);
1111 :
1112 30 : if (prepare_data.prepare_lsn != remote_final_lsn)
1113 0 : ereport(ERROR,
1114 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1115 : errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
1116 : LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1117 : LSN_FORMAT_ARGS(remote_final_lsn))));
1118 :
1119 : /*
1120 : * Unlike commit, here, we always prepare the transaction even though no
1121 : * change has happened in this transaction or all changes are skipped. It
1122 : * is done this way because at commit prepared time, we won't know whether
1123 : * we have skipped preparing a transaction because of those reasons.
1124 : *
1125 : * XXX, We can optimize such that at commit prepared time, we first check
1126 : * whether we have prepared the transaction or not but that doesn't seem
1127 : * worthwhile because such cases shouldn't be common.
1128 : */
1129 30 : begin_replication_step();
1130 :
1131 30 : apply_handle_prepare_internal(&prepare_data);
1132 :
1133 30 : end_replication_step();
1134 30 : CommitTransactionCommand();
1135 28 : pgstat_report_stat(false);
1136 :
1137 : /*
1138 : * It is okay not to set the local_end LSN for the prepare because we
1139 : * always flush the prepare record. So, we can send the acknowledgment of
1140 : * the remote_end LSN as soon as prepare is finished.
1141 : *
1142 : * XXX For the sake of consistency with commit, we could have set it with
1143 : * the LSN of prepare but as of now we don't track that value similar to
1144 : * XactLastCommitEnd, and adding it for this purpose doesn't seems worth
1145 : * it.
1146 : */
1147 28 : store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
1148 :
1149 28 : in_remote_transaction = false;
1150 :
1151 : /* Process any tables that are being synchronized in parallel. */
1152 28 : process_syncing_tables(prepare_data.end_lsn);
1153 :
1154 : /*
1155 : * Since we have already prepared the transaction, in a case where the
1156 : * server crashes before clearing the subskiplsn, it will be left but the
1157 : * transaction won't be resent. But that's okay because it's a rare case
1158 : * and the subskiplsn will be cleared when finishing the next transaction.
1159 : */
1160 28 : stop_skipping_changes();
1161 28 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
1162 :
1163 28 : pgstat_report_activity(STATE_IDLE, NULL);
1164 28 : reset_apply_error_context_info();
1165 28 : }
1166 :
1167 : /*
1168 : * Handle a COMMIT PREPARED of a previously PREPARED transaction.
1169 : *
1170 : * Note that we don't need to wait here if the transaction was prepared in a
1171 : * parallel apply worker. In that case, we have already waited for the prepare
1172 : * to finish in apply_handle_stream_prepare() which will ensure all the
1173 : * operations in that transaction have happened in the subscriber, so no
1174 : * concurrent transaction can cause deadlock or transaction dependency issues.
1175 : */
1176 : static void
1177 40 : apply_handle_commit_prepared(StringInfo s)
1178 : {
1179 : LogicalRepCommitPreparedTxnData prepare_data;
1180 : char gid[GIDSIZE];
1181 :
1182 40 : logicalrep_read_commit_prepared(s, &prepare_data);
1183 40 : set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
1184 :
1185 : /* Compute GID for two_phase transactions. */
1186 40 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
1187 : gid, sizeof(gid));
1188 :
1189 : /* There is no transaction when COMMIT PREPARED is called */
1190 40 : begin_replication_step();
1191 :
1192 : /*
1193 : * Update origin state so we can restart streaming from correct position
1194 : * in case of crash.
1195 : */
1196 40 : replorigin_session_origin_lsn = prepare_data.end_lsn;
1197 40 : replorigin_session_origin_timestamp = prepare_data.commit_time;
1198 :
1199 40 : FinishPreparedTransaction(gid, true);
1200 40 : end_replication_step();
1201 40 : CommitTransactionCommand();
1202 40 : pgstat_report_stat(false);
1203 :
1204 40 : store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
1205 40 : in_remote_transaction = false;
1206 :
1207 : /* Process any tables that are being synchronized in parallel. */
1208 40 : process_syncing_tables(prepare_data.end_lsn);
1209 :
1210 40 : clear_subscription_skip_lsn(prepare_data.end_lsn);
1211 :
1212 40 : pgstat_report_activity(STATE_IDLE, NULL);
1213 40 : reset_apply_error_context_info();
1214 40 : }
1215 :
1216 : /*
1217 : * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
1218 : *
1219 : * Note that we don't need to wait here if the transaction was prepared in a
1220 : * parallel apply worker. In that case, we have already waited for the prepare
1221 : * to finish in apply_handle_stream_prepare() which will ensure all the
1222 : * operations in that transaction have happened in the subscriber, so no
1223 : * concurrent transaction can cause deadlock or transaction dependency issues.
1224 : */
1225 : static void
1226 10 : apply_handle_rollback_prepared(StringInfo s)
1227 : {
1228 : LogicalRepRollbackPreparedTxnData rollback_data;
1229 : char gid[GIDSIZE];
1230 :
1231 10 : logicalrep_read_rollback_prepared(s, &rollback_data);
1232 10 : set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1233 :
1234 : /* Compute GID for two_phase transactions. */
1235 10 : TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
1236 : gid, sizeof(gid));
1237 :
1238 : /*
1239 : * It is possible that we haven't received prepare because it occurred
1240 : * before walsender reached a consistent point or the two_phase was still
1241 : * not enabled by that time, so in such cases, we need to skip rollback
1242 : * prepared.
1243 : */
1244 10 : if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1245 : rollback_data.prepare_time))
1246 : {
1247 : /*
1248 : * Update origin state so we can restart streaming from correct
1249 : * position in case of crash.
1250 : */
1251 10 : replorigin_session_origin_lsn = rollback_data.rollback_end_lsn;
1252 10 : replorigin_session_origin_timestamp = rollback_data.rollback_time;
1253 :
1254 : /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1255 10 : begin_replication_step();
1256 10 : FinishPreparedTransaction(gid, false);
1257 10 : end_replication_step();
1258 10 : CommitTransactionCommand();
1259 :
1260 10 : clear_subscription_skip_lsn(rollback_data.rollback_end_lsn);
1261 : }
1262 :
1263 10 : pgstat_report_stat(false);
1264 :
1265 : /*
1266 : * It is okay not to set the local_end LSN for the rollback of prepared
1267 : * transaction because we always flush the WAL record for it. See
1268 : * apply_handle_prepare.
1269 : */
1270 10 : store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr);
1271 10 : in_remote_transaction = false;
1272 :
1273 : /* Process any tables that are being synchronized in parallel. */
1274 10 : process_syncing_tables(rollback_data.rollback_end_lsn);
1275 :
1276 10 : pgstat_report_activity(STATE_IDLE, NULL);
1277 10 : reset_apply_error_context_info();
1278 10 : }
1279 :
1280 : /*
1281 : * Handle STREAM PREPARE.
1282 : */
1283 : static void
1284 22 : apply_handle_stream_prepare(StringInfo s)
1285 : {
1286 : LogicalRepPreparedTxnData prepare_data;
1287 : ParallelApplyWorkerInfo *winfo;
1288 : TransApplyAction apply_action;
1289 :
1290 : /* Save the message before it is consumed. */
1291 22 : StringInfoData original_msg = *s;
1292 :
1293 22 : if (in_streamed_transaction)
1294 0 : ereport(ERROR,
1295 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1296 : errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1297 :
1298 : /* Tablesync should never receive prepare. */
1299 22 : if (am_tablesync_worker())
1300 0 : ereport(ERROR,
1301 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1302 : errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1303 :
1304 22 : logicalrep_read_stream_prepare(s, &prepare_data);
1305 22 : set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1306 :
1307 22 : apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
1308 :
1309 22 : switch (apply_action)
1310 : {
1311 10 : case TRANS_LEADER_APPLY:
1312 :
1313 : /*
1314 : * The transaction has been serialized to file, so replay all the
1315 : * spooled operations.
1316 : */
1317 10 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset,
1318 : prepare_data.xid, prepare_data.prepare_lsn);
1319 :
1320 : /* Mark the transaction as prepared. */
1321 10 : apply_handle_prepare_internal(&prepare_data);
1322 :
1323 10 : CommitTransactionCommand();
1324 :
1325 : /*
1326 : * It is okay not to set the local_end LSN for the prepare because
1327 : * we always flush the prepare record. See apply_handle_prepare.
1328 : */
1329 10 : store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
1330 :
1331 10 : in_remote_transaction = false;
1332 :
1333 : /* Unlink the files with serialized changes and subxact info. */
1334 10 : stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
1335 :
1336 10 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1337 10 : break;
1338 :
1339 4 : case TRANS_LEADER_SEND_TO_PARALLEL:
1340 : Assert(winfo);
1341 :
1342 4 : if (pa_send_data(winfo, s->len, s->data))
1343 : {
1344 : /* Finish processing the streaming transaction. */
1345 4 : pa_xact_finish(winfo, prepare_data.end_lsn);
1346 4 : break;
1347 : }
1348 :
1349 : /*
1350 : * Switch to serialize mode when we are not able to send the
1351 : * change to parallel apply worker.
1352 : */
1353 0 : pa_switch_to_partial_serialize(winfo, true);
1354 :
1355 : /* fall through */
1356 2 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1357 : Assert(winfo);
1358 :
1359 2 : stream_open_and_write_change(prepare_data.xid,
1360 : LOGICAL_REP_MSG_STREAM_PREPARE,
1361 : &original_msg);
1362 :
1363 2 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
1364 :
1365 : /* Finish processing the streaming transaction. */
1366 2 : pa_xact_finish(winfo, prepare_data.end_lsn);
1367 2 : break;
1368 :
1369 6 : case TRANS_PARALLEL_APPLY:
1370 :
1371 : /*
1372 : * If the parallel apply worker is applying spooled messages then
1373 : * close the file before preparing.
1374 : */
1375 6 : if (stream_fd)
1376 2 : stream_close_file();
1377 :
1378 6 : begin_replication_step();
1379 :
1380 : /* Mark the transaction as prepared. */
1381 6 : apply_handle_prepare_internal(&prepare_data);
1382 :
1383 6 : end_replication_step();
1384 :
1385 6 : CommitTransactionCommand();
1386 :
1387 : /*
1388 : * It is okay not to set the local_end LSN for the prepare because
1389 : * we always flush the prepare record. See apply_handle_prepare.
1390 : */
1391 6 : MyParallelShared->last_commit_end = InvalidXLogRecPtr;
1392 :
1393 6 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
1394 6 : pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1395 :
1396 6 : pa_reset_subtrans();
1397 :
1398 6 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1399 6 : break;
1400 :
1401 0 : default:
1402 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1403 : break;
1404 : }
1405 :
1406 22 : pgstat_report_stat(false);
1407 :
1408 : /* Process any tables that are being synchronized in parallel. */
1409 22 : process_syncing_tables(prepare_data.end_lsn);
1410 :
1411 : /*
1412 : * Similar to prepare case, the subskiplsn could be left in a case of
1413 : * server crash but it's okay. See the comments in apply_handle_prepare().
1414 : */
1415 22 : stop_skipping_changes();
1416 22 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
1417 :
1418 22 : pgstat_report_activity(STATE_IDLE, NULL);
1419 :
1420 22 : reset_apply_error_context_info();
1421 22 : }
1422 :
1423 : /*
1424 : * Handle ORIGIN message.
1425 : *
1426 : * TODO, support tracking of multiple origins
1427 : */
1428 : static void
1429 16 : apply_handle_origin(StringInfo s)
1430 : {
1431 : /*
1432 : * ORIGIN message can only come inside streaming transaction or inside
1433 : * remote transaction and before any actual writes.
1434 : */
1435 16 : if (!in_streamed_transaction &&
1436 24 : (!in_remote_transaction ||
1437 12 : (IsTransactionState() && !am_tablesync_worker())))
1438 0 : ereport(ERROR,
1439 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1440 : errmsg_internal("ORIGIN message sent out of order")));
1441 16 : }
1442 :
1443 : /*
1444 : * Initialize fileset (if not already done).
1445 : *
1446 : * Create a new file when first_segment is true, otherwise open the existing
1447 : * file.
1448 : */
1449 : void
1450 774 : stream_start_internal(TransactionId xid, bool first_segment)
1451 : {
1452 774 : begin_replication_step();
1453 :
1454 : /*
1455 : * Initialize the worker's stream_fileset if we haven't yet. This will be
1456 : * used for the entire duration of the worker so create it in a permanent
1457 : * context. We create this on the very first streaming message from any
1458 : * transaction and then use it for this and other streaming transactions.
1459 : * Now, we could create a fileset at the start of the worker as well but
1460 : * then we won't be sure that it will ever be used.
1461 : */
1462 774 : if (!MyLogicalRepWorker->stream_fileset)
1463 : {
1464 : MemoryContext oldctx;
1465 :
1466 28 : oldctx = MemoryContextSwitchTo(ApplyContext);
1467 :
1468 28 : MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
1469 28 : FileSetInit(MyLogicalRepWorker->stream_fileset);
1470 :
1471 28 : MemoryContextSwitchTo(oldctx);
1472 : }
1473 :
1474 : /* Open the spool file for this transaction. */
1475 774 : stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
1476 :
1477 : /* If this is not the first segment, open existing subxact file. */
1478 774 : if (!first_segment)
1479 710 : subxact_info_read(MyLogicalRepWorker->subid, xid);
1480 :
1481 774 : end_replication_step();
1482 774 : }
1483 :
1484 : /*
1485 : * Handle STREAM START message.
1486 : */
1487 : static void
1488 1760 : apply_handle_stream_start(StringInfo s)
1489 : {
1490 : bool first_segment;
1491 : ParallelApplyWorkerInfo *winfo;
1492 : TransApplyAction apply_action;
1493 :
1494 : /* Save the message before it is consumed. */
1495 1760 : StringInfoData original_msg = *s;
1496 :
1497 1760 : if (in_streamed_transaction)
1498 0 : ereport(ERROR,
1499 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1500 : errmsg_internal("duplicate STREAM START message")));
1501 :
1502 : /* There must not be an active streaming transaction. */
1503 : Assert(!TransactionIdIsValid(stream_xid));
1504 :
1505 : /* notify handle methods we're processing a remote transaction */
1506 1760 : in_streamed_transaction = true;
1507 :
1508 : /* extract XID of the top-level transaction */
1509 1760 : stream_xid = logicalrep_read_stream_start(s, &first_segment);
1510 :
1511 1760 : if (!TransactionIdIsValid(stream_xid))
1512 0 : ereport(ERROR,
1513 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1514 : errmsg_internal("invalid transaction ID in streamed replication transaction")));
1515 :
1516 1760 : set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
1517 :
1518 : /* Try to allocate a worker for the streaming transaction. */
1519 1760 : if (first_segment)
1520 164 : pa_allocate_worker(stream_xid);
1521 :
1522 1760 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1523 :
1524 1760 : switch (apply_action)
1525 : {
1526 734 : case TRANS_LEADER_SERIALIZE:
1527 :
1528 : /*
1529 : * Function stream_start_internal starts a transaction. This
1530 : * transaction will be committed on the stream stop unless it is a
1531 : * tablesync worker in which case it will be committed after
1532 : * processing all the messages. We need this transaction for
1533 : * handling the BufFile, used for serializing the streaming data
1534 : * and subxact info.
1535 : */
1536 734 : stream_start_internal(stream_xid, first_segment);
1537 734 : break;
1538 :
1539 500 : case TRANS_LEADER_SEND_TO_PARALLEL:
1540 : Assert(winfo);
1541 :
1542 : /*
1543 : * Once we start serializing the changes, the parallel apply
1544 : * worker will wait for the leader to release the stream lock
1545 : * until the end of the transaction. So, we don't need to release
1546 : * the lock or increment the stream count in that case.
1547 : */
1548 500 : if (pa_send_data(winfo, s->len, s->data))
1549 : {
1550 : /*
1551 : * Unlock the shared object lock so that the parallel apply
1552 : * worker can continue to receive changes.
1553 : */
1554 492 : if (!first_segment)
1555 446 : pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
1556 :
1557 : /*
1558 : * Increment the number of streaming blocks waiting to be
1559 : * processed by parallel apply worker.
1560 : */
1561 492 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
1562 :
1563 : /* Cache the parallel apply worker for this transaction. */
1564 492 : pa_set_stream_apply_worker(winfo);
1565 492 : break;
1566 : }
1567 :
1568 : /*
1569 : * Switch to serialize mode when we are not able to send the
1570 : * change to parallel apply worker.
1571 : */
1572 8 : pa_switch_to_partial_serialize(winfo, !first_segment);
1573 :
1574 : /* fall through */
1575 30 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1576 : Assert(winfo);
1577 :
1578 : /*
1579 : * Open the spool file unless it was already opened when switching
1580 : * to serialize mode. The transaction started in
1581 : * stream_start_internal will be committed on the stream stop.
1582 : */
1583 30 : if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
1584 22 : stream_start_internal(stream_xid, first_segment);
1585 :
1586 30 : stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg);
1587 :
1588 : /* Cache the parallel apply worker for this transaction. */
1589 30 : pa_set_stream_apply_worker(winfo);
1590 30 : break;
1591 :
1592 504 : case TRANS_PARALLEL_APPLY:
1593 504 : if (first_segment)
1594 : {
1595 : /* Hold the lock until the end of the transaction. */
1596 54 : pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1597 54 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED);
1598 :
1599 : /*
1600 : * Signal the leader apply worker, as it may be waiting for
1601 : * us.
1602 : */
1603 54 : logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
1604 : }
1605 :
1606 504 : parallel_stream_nchanges = 0;
1607 504 : break;
1608 :
1609 0 : default:
1610 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1611 : break;
1612 : }
1613 :
1614 1760 : pgstat_report_activity(STATE_RUNNING, NULL);
1615 1760 : }
1616 :
1617 : /*
1618 : * Update the information about subxacts and close the file.
1619 : *
1620 : * This function should be called when the stream_start_internal function has
1621 : * been called.
1622 : */
1623 : void
1624 774 : stream_stop_internal(TransactionId xid)
1625 : {
1626 : /*
1627 : * Serialize information about subxacts for the toplevel transaction, then
1628 : * close the stream messages spool file.
1629 : */
1630 774 : subxact_info_write(MyLogicalRepWorker->subid, xid);
1631 774 : stream_close_file();
1632 :
1633 : /* We must be in a valid transaction state */
1634 : Assert(IsTransactionState());
1635 :
1636 : /* Commit the per-stream transaction */
1637 774 : CommitTransactionCommand();
1638 :
1639 : /* Reset per-stream context */
1640 774 : MemoryContextReset(LogicalStreamingContext);
1641 774 : }
1642 :
1643 : /*
1644 : * Handle STREAM STOP message.
1645 : */
1646 : static void
1647 1758 : apply_handle_stream_stop(StringInfo s)
1648 : {
1649 : ParallelApplyWorkerInfo *winfo;
1650 : TransApplyAction apply_action;
1651 :
1652 1758 : if (!in_streamed_transaction)
1653 0 : ereport(ERROR,
1654 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1655 : errmsg_internal("STREAM STOP message without STREAM START")));
1656 :
1657 1758 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1658 :
1659 1758 : switch (apply_action)
1660 : {
1661 734 : case TRANS_LEADER_SERIALIZE:
1662 734 : stream_stop_internal(stream_xid);
1663 734 : break;
1664 :
1665 492 : case TRANS_LEADER_SEND_TO_PARALLEL:
1666 : Assert(winfo);
1667 :
1668 : /*
1669 : * Lock before sending the STREAM_STOP message so that the leader
1670 : * can hold the lock first and the parallel apply worker will wait
1671 : * for leader to release the lock. See Locking Considerations atop
1672 : * applyparallelworker.c.
1673 : */
1674 492 : pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
1675 :
1676 492 : if (pa_send_data(winfo, s->len, s->data))
1677 : {
1678 492 : pa_set_stream_apply_worker(NULL);
1679 492 : break;
1680 : }
1681 :
1682 : /*
1683 : * Switch to serialize mode when we are not able to send the
1684 : * change to parallel apply worker.
1685 : */
1686 0 : pa_switch_to_partial_serialize(winfo, true);
1687 :
1688 : /* fall through */
1689 30 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1690 30 : stream_write_change(LOGICAL_REP_MSG_STREAM_STOP, s);
1691 30 : stream_stop_internal(stream_xid);
1692 30 : pa_set_stream_apply_worker(NULL);
1693 30 : break;
1694 :
1695 502 : case TRANS_PARALLEL_APPLY:
1696 502 : elog(DEBUG1, "applied %u changes in the streaming chunk",
1697 : parallel_stream_nchanges);
1698 :
1699 : /*
1700 : * By the time parallel apply worker is processing the changes in
1701 : * the current streaming block, the leader apply worker may have
1702 : * sent multiple streaming blocks. This can lead to parallel apply
1703 : * worker start waiting even when there are more chunk of streams
1704 : * in the queue. So, try to lock only if there is no message left
1705 : * in the queue. See Locking Considerations atop
1706 : * applyparallelworker.c.
1707 : *
1708 : * Note that here we have a race condition where we can start
1709 : * waiting even when there are pending streaming chunks. This can
1710 : * happen if the leader sends another streaming block and acquires
1711 : * the stream lock again after the parallel apply worker checks
1712 : * that there is no pending streaming block and before it actually
1713 : * starts waiting on a lock. We can handle this case by not
1714 : * allowing the leader to increment the stream block count during
1715 : * the time parallel apply worker acquires the lock but it is not
1716 : * clear whether that is worth the complexity.
1717 : *
1718 : * Now, if this missed chunk contains rollback to savepoint, then
1719 : * there is a risk of deadlock which probably shouldn't happen
1720 : * after restart.
1721 : */
1722 502 : pa_decr_and_wait_stream_block();
1723 498 : break;
1724 :
1725 0 : default:
1726 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1727 : break;
1728 : }
1729 :
1730 1754 : in_streamed_transaction = false;
1731 1754 : stream_xid = InvalidTransactionId;
1732 :
1733 : /*
1734 : * The parallel apply worker could be in a transaction in which case we
1735 : * need to report the state as STATE_IDLEINTRANSACTION.
1736 : */
1737 1754 : if (IsTransactionOrTransactionBlock())
1738 498 : pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL);
1739 : else
1740 1256 : pgstat_report_activity(STATE_IDLE, NULL);
1741 :
1742 1754 : reset_apply_error_context_info();
1743 1754 : }
1744 :
1745 : /*
1746 : * Helper function to handle STREAM ABORT message when the transaction was
1747 : * serialized to file.
1748 : */
1749 : static void
1750 28 : stream_abort_internal(TransactionId xid, TransactionId subxid)
1751 : {
1752 : /*
1753 : * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1754 : * just delete the files with serialized info.
1755 : */
1756 28 : if (xid == subxid)
1757 2 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
1758 : else
1759 : {
1760 : /*
1761 : * OK, so it's a subxact. We need to read the subxact file for the
1762 : * toplevel transaction, determine the offset tracked for the subxact,
1763 : * and truncate the file with changes. We also remove the subxacts
1764 : * with higher offsets (or rather higher XIDs).
1765 : *
1766 : * We intentionally scan the array from the tail, because we're likely
1767 : * aborting a change for the most recent subtransactions.
1768 : *
1769 : * We can't use the binary search here as subxact XIDs won't
1770 : * necessarily arrive in sorted order, consider the case where we have
1771 : * released the savepoint for multiple subtransactions and then
1772 : * performed rollback to savepoint for one of the earlier
1773 : * sub-transaction.
1774 : */
1775 : int64 i;
1776 : int64 subidx;
1777 : BufFile *fd;
1778 26 : bool found = false;
1779 : char path[MAXPGPATH];
1780 :
1781 26 : subidx = -1;
1782 26 : begin_replication_step();
1783 26 : subxact_info_read(MyLogicalRepWorker->subid, xid);
1784 :
1785 30 : for (i = subxact_data.nsubxacts; i > 0; i--)
1786 : {
1787 22 : if (subxact_data.subxacts[i - 1].xid == subxid)
1788 : {
1789 18 : subidx = (i - 1);
1790 18 : found = true;
1791 18 : break;
1792 : }
1793 : }
1794 :
1795 : /*
1796 : * If it's an empty sub-transaction then we will not find the subxid
1797 : * here so just cleanup the subxact info and return.
1798 : */
1799 26 : if (!found)
1800 : {
1801 : /* Cleanup the subxact info */
1802 8 : cleanup_subxact_info();
1803 8 : end_replication_step();
1804 8 : CommitTransactionCommand();
1805 8 : return;
1806 : }
1807 :
1808 : /* open the changes file */
1809 18 : changes_filename(path, MyLogicalRepWorker->subid, xid);
1810 18 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path,
1811 : O_RDWR, false);
1812 :
1813 : /* OK, truncate the file at the right offset */
1814 18 : BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
1815 18 : subxact_data.subxacts[subidx].offset);
1816 18 : BufFileClose(fd);
1817 :
1818 : /* discard the subxacts added later */
1819 18 : subxact_data.nsubxacts = subidx;
1820 :
1821 : /* write the updated subxact list */
1822 18 : subxact_info_write(MyLogicalRepWorker->subid, xid);
1823 :
1824 18 : end_replication_step();
1825 18 : CommitTransactionCommand();
1826 : }
1827 : }
1828 :
1829 : /*
1830 : * Handle STREAM ABORT message.
1831 : */
1832 : static void
1833 76 : apply_handle_stream_abort(StringInfo s)
1834 : {
1835 : TransactionId xid;
1836 : TransactionId subxid;
1837 : LogicalRepStreamAbortData abort_data;
1838 : ParallelApplyWorkerInfo *winfo;
1839 : TransApplyAction apply_action;
1840 :
1841 : /* Save the message before it is consumed. */
1842 76 : StringInfoData original_msg = *s;
1843 : bool toplevel_xact;
1844 :
1845 76 : if (in_streamed_transaction)
1846 0 : ereport(ERROR,
1847 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1848 : errmsg_internal("STREAM ABORT message without STREAM STOP")));
1849 :
1850 : /* We receive abort information only when we can apply in parallel. */
1851 76 : logicalrep_read_stream_abort(s, &abort_data,
1852 76 : MyLogicalRepWorker->parallel_apply);
1853 :
1854 76 : xid = abort_data.xid;
1855 76 : subxid = abort_data.subxid;
1856 76 : toplevel_xact = (xid == subxid);
1857 :
1858 76 : set_apply_error_context_xact(subxid, abort_data.abort_lsn);
1859 :
1860 76 : apply_action = get_transaction_apply_action(xid, &winfo);
1861 :
1862 76 : switch (apply_action)
1863 : {
1864 28 : case TRANS_LEADER_APPLY:
1865 :
1866 : /*
1867 : * We are in the leader apply worker and the transaction has been
1868 : * serialized to file.
1869 : */
1870 28 : stream_abort_internal(xid, subxid);
1871 :
1872 28 : elog(DEBUG1, "finished processing the STREAM ABORT command");
1873 28 : break;
1874 :
1875 20 : case TRANS_LEADER_SEND_TO_PARALLEL:
1876 : Assert(winfo);
1877 :
1878 : /*
1879 : * For the case of aborting the subtransaction, we increment the
1880 : * number of streaming blocks and take the lock again before
1881 : * sending the STREAM_ABORT to ensure that the parallel apply
1882 : * worker will wait on the lock for the next set of changes after
1883 : * processing the STREAM_ABORT message if it is not already
1884 : * waiting for STREAM_STOP message.
1885 : *
1886 : * It is important to perform this locking before sending the
1887 : * STREAM_ABORT message so that the leader can hold the lock first
1888 : * and the parallel apply worker will wait for the leader to
1889 : * release the lock. This is the same as what we do in
1890 : * apply_handle_stream_stop. See Locking Considerations atop
1891 : * applyparallelworker.c.
1892 : */
1893 20 : if (!toplevel_xact)
1894 : {
1895 18 : pa_unlock_stream(xid, AccessExclusiveLock);
1896 18 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
1897 18 : pa_lock_stream(xid, AccessExclusiveLock);
1898 : }
1899 :
1900 20 : if (pa_send_data(winfo, s->len, s->data))
1901 : {
1902 : /*
1903 : * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
1904 : * wait here for the parallel apply worker to finish as that
1905 : * is not required to maintain the commit order and won't have
1906 : * the risk of failures due to transaction dependencies and
1907 : * deadlocks. However, it is possible that before the parallel
1908 : * worker finishes and we clear the worker info, the xid
1909 : * wraparound happens on the upstream and a new transaction
1910 : * with the same xid can appear and that can lead to duplicate
1911 : * entries in ParallelApplyTxnHash. Yet another problem could
1912 : * be that we may have serialized the changes in partial
1913 : * serialize mode and the file containing xact changes may
1914 : * already exist, and after xid wraparound trying to create
1915 : * the file for the same xid can lead to an error. To avoid
1916 : * these problems, we decide to wait for the aborts to finish.
1917 : *
1918 : * Note, it is okay to not update the flush location position
1919 : * for aborts as in worst case that means such a transaction
1920 : * won't be sent again after restart.
1921 : */
1922 20 : if (toplevel_xact)
1923 2 : pa_xact_finish(winfo, InvalidXLogRecPtr);
1924 :
1925 20 : break;
1926 : }
1927 :
1928 : /*
1929 : * Switch to serialize mode when we are not able to send the
1930 : * change to parallel apply worker.
1931 : */
1932 0 : pa_switch_to_partial_serialize(winfo, true);
1933 :
1934 : /* fall through */
1935 4 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1936 : Assert(winfo);
1937 :
1938 : /*
1939 : * Parallel apply worker might have applied some changes, so write
1940 : * the STREAM_ABORT message so that it can rollback the
1941 : * subtransaction if needed.
1942 : */
1943 4 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_ABORT,
1944 : &original_msg);
1945 :
1946 4 : if (toplevel_xact)
1947 : {
1948 2 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
1949 2 : pa_xact_finish(winfo, InvalidXLogRecPtr);
1950 : }
1951 4 : break;
1952 :
1953 24 : case TRANS_PARALLEL_APPLY:
1954 :
1955 : /*
1956 : * If the parallel apply worker is applying spooled messages then
1957 : * close the file before aborting.
1958 : */
1959 24 : if (toplevel_xact && stream_fd)
1960 2 : stream_close_file();
1961 :
1962 24 : pa_stream_abort(&abort_data);
1963 :
1964 : /*
1965 : * We need to wait after processing rollback to savepoint for the
1966 : * next set of changes.
1967 : *
1968 : * We have a race condition here due to which we can start waiting
1969 : * here when there are more chunk of streams in the queue. See
1970 : * apply_handle_stream_stop.
1971 : */
1972 24 : if (!toplevel_xact)
1973 20 : pa_decr_and_wait_stream_block();
1974 :
1975 24 : elog(DEBUG1, "finished processing the STREAM ABORT command");
1976 24 : break;
1977 :
1978 0 : default:
1979 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1980 : break;
1981 : }
1982 :
1983 76 : reset_apply_error_context_info();
1984 76 : }
1985 :
1986 : /*
1987 : * Ensure that the passed location is fileset's end.
1988 : */
1989 : static void
1990 8 : ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
1991 : off_t offset)
1992 : {
1993 : char path[MAXPGPATH];
1994 : BufFile *fd;
1995 : int last_fileno;
1996 : off_t last_offset;
1997 :
1998 : Assert(!IsTransactionState());
1999 :
2000 8 : begin_replication_step();
2001 :
2002 8 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2003 :
2004 8 : fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2005 :
2006 8 : BufFileSeek(fd, 0, 0, SEEK_END);
2007 8 : BufFileTell(fd, &last_fileno, &last_offset);
2008 :
2009 8 : BufFileClose(fd);
2010 :
2011 8 : end_replication_step();
2012 :
2013 8 : if (last_fileno != fileno || last_offset != offset)
2014 0 : elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2015 : path);
2016 8 : }
2017 :
2018 : /*
2019 : * Common spoolfile processing.
2020 : */
2021 : void
2022 62 : apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
2023 : XLogRecPtr lsn)
2024 : {
2025 : int nchanges;
2026 : char path[MAXPGPATH];
2027 62 : char *buffer = NULL;
2028 : MemoryContext oldcxt;
2029 : ResourceOwner oldowner;
2030 : int fileno;
2031 : off_t offset;
2032 :
2033 62 : if (!am_parallel_apply_worker())
2034 54 : maybe_start_skipping_changes(lsn);
2035 :
2036 : /* Make sure we have an open transaction */
2037 62 : begin_replication_step();
2038 :
2039 : /*
2040 : * Allocate file handle and memory required to process all the messages in
2041 : * TopTransactionContext to avoid them getting reset after each message is
2042 : * processed.
2043 : */
2044 62 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
2045 :
2046 : /* Open the spool file for the committed/prepared transaction */
2047 62 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2048 62 : elog(DEBUG1, "replaying changes from file \"%s\"", path);
2049 :
2050 : /*
2051 : * Make sure the file is owned by the toplevel transaction so that the
2052 : * file will not be accidentally closed when aborting a subtransaction.
2053 : */
2054 62 : oldowner = CurrentResourceOwner;
2055 62 : CurrentResourceOwner = TopTransactionResourceOwner;
2056 :
2057 62 : stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2058 :
2059 62 : CurrentResourceOwner = oldowner;
2060 :
2061 62 : buffer = palloc(BLCKSZ);
2062 :
2063 62 : MemoryContextSwitchTo(oldcxt);
2064 :
2065 62 : remote_final_lsn = lsn;
2066 :
2067 : /*
2068 : * Make sure the handle apply_dispatch methods are aware we're in a remote
2069 : * transaction.
2070 : */
2071 62 : in_remote_transaction = true;
2072 62 : pgstat_report_activity(STATE_RUNNING, NULL);
2073 :
2074 62 : end_replication_step();
2075 :
2076 : /*
2077 : * Read the entries one by one and pass them through the same logic as in
2078 : * apply_dispatch.
2079 : */
2080 62 : nchanges = 0;
2081 : while (true)
2082 176938 : {
2083 : StringInfoData s2;
2084 : size_t nbytes;
2085 : int len;
2086 :
2087 177000 : CHECK_FOR_INTERRUPTS();
2088 :
2089 : /* read length of the on-disk record */
2090 177000 : nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2091 :
2092 : /* have we reached end of the file? */
2093 177000 : if (nbytes == 0)
2094 52 : break;
2095 :
2096 : /* do we have a correct length? */
2097 176948 : if (len <= 0)
2098 0 : elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2099 : len, path);
2100 :
2101 : /* make sure we have sufficiently large buffer */
2102 176948 : buffer = repalloc(buffer, len);
2103 :
2104 : /* and finally read the data into the buffer */
2105 176948 : BufFileReadExact(stream_fd, buffer, len);
2106 :
2107 176948 : BufFileTell(stream_fd, &fileno, &offset);
2108 :
2109 : /* init a stringinfo using the buffer and call apply_dispatch */
2110 176948 : initReadOnlyStringInfo(&s2, buffer, len);
2111 :
2112 : /* Ensure we are reading the data into our memory context. */
2113 176948 : oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
2114 :
2115 176948 : apply_dispatch(&s2);
2116 :
2117 176946 : MemoryContextReset(ApplyMessageContext);
2118 :
2119 176946 : MemoryContextSwitchTo(oldcxt);
2120 :
2121 176946 : nchanges++;
2122 :
2123 : /*
2124 : * It is possible the file has been closed because we have processed
2125 : * the transaction end message like stream_commit in which case that
2126 : * must be the last message.
2127 : */
2128 176946 : if (!stream_fd)
2129 : {
2130 8 : ensure_last_message(stream_fileset, xid, fileno, offset);
2131 8 : break;
2132 : }
2133 :
2134 176938 : if (nchanges % 1000 == 0)
2135 166 : elog(DEBUG1, "replayed %d changes from file \"%s\"",
2136 : nchanges, path);
2137 : }
2138 :
2139 60 : if (stream_fd)
2140 52 : stream_close_file();
2141 :
2142 60 : elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2143 : nchanges, path);
2144 :
2145 60 : return;
2146 : }
2147 :
2148 : /*
2149 : * Handle STREAM COMMIT message.
2150 : */
2151 : static void
2152 122 : apply_handle_stream_commit(StringInfo s)
2153 : {
2154 : TransactionId xid;
2155 : LogicalRepCommitData commit_data;
2156 : ParallelApplyWorkerInfo *winfo;
2157 : TransApplyAction apply_action;
2158 :
2159 : /* Save the message before it is consumed. */
2160 122 : StringInfoData original_msg = *s;
2161 :
2162 122 : if (in_streamed_transaction)
2163 0 : ereport(ERROR,
2164 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2165 : errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2166 :
2167 122 : xid = logicalrep_read_stream_commit(s, &commit_data);
2168 122 : set_apply_error_context_xact(xid, commit_data.commit_lsn);
2169 :
2170 122 : apply_action = get_transaction_apply_action(xid, &winfo);
2171 :
2172 122 : switch (apply_action)
2173 : {
2174 44 : case TRANS_LEADER_APPLY:
2175 :
2176 : /*
2177 : * The transaction has been serialized to file, so replay all the
2178 : * spooled operations.
2179 : */
2180 44 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid,
2181 : commit_data.commit_lsn);
2182 :
2183 42 : apply_handle_commit_internal(&commit_data);
2184 :
2185 : /* Unlink the files with serialized changes and subxact info. */
2186 42 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
2187 :
2188 42 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2189 42 : break;
2190 :
2191 36 : case TRANS_LEADER_SEND_TO_PARALLEL:
2192 : Assert(winfo);
2193 :
2194 36 : if (pa_send_data(winfo, s->len, s->data))
2195 : {
2196 : /* Finish processing the streaming transaction. */
2197 36 : pa_xact_finish(winfo, commit_data.end_lsn);
2198 34 : break;
2199 : }
2200 :
2201 : /*
2202 : * Switch to serialize mode when we are not able to send the
2203 : * change to parallel apply worker.
2204 : */
2205 0 : pa_switch_to_partial_serialize(winfo, true);
2206 :
2207 : /* fall through */
2208 4 : case TRANS_LEADER_PARTIAL_SERIALIZE:
2209 : Assert(winfo);
2210 :
2211 4 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_COMMIT,
2212 : &original_msg);
2213 :
2214 4 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
2215 :
2216 : /* Finish processing the streaming transaction. */
2217 4 : pa_xact_finish(winfo, commit_data.end_lsn);
2218 4 : break;
2219 :
2220 38 : case TRANS_PARALLEL_APPLY:
2221 :
2222 : /*
2223 : * If the parallel apply worker is applying spooled messages then
2224 : * close the file before committing.
2225 : */
2226 38 : if (stream_fd)
2227 4 : stream_close_file();
2228 :
2229 38 : apply_handle_commit_internal(&commit_data);
2230 :
2231 38 : MyParallelShared->last_commit_end = XactLastCommitEnd;
2232 :
2233 : /*
2234 : * It is important to set the transaction state as finished before
2235 : * releasing the lock. See pa_wait_for_xact_finish.
2236 : */
2237 38 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
2238 38 : pa_unlock_transaction(xid, AccessExclusiveLock);
2239 :
2240 38 : pa_reset_subtrans();
2241 :
2242 38 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2243 38 : break;
2244 :
2245 0 : default:
2246 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2247 : break;
2248 : }
2249 :
2250 : /* Process any tables that are being synchronized in parallel. */
2251 118 : process_syncing_tables(commit_data.end_lsn);
2252 :
2253 118 : pgstat_report_activity(STATE_IDLE, NULL);
2254 :
2255 118 : reset_apply_error_context_info();
2256 118 : }
2257 :
2258 : /*
2259 : * Helper function for apply_handle_commit and apply_handle_stream_commit.
2260 : */
2261 : static void
2262 914 : apply_handle_commit_internal(LogicalRepCommitData *commit_data)
2263 : {
2264 914 : if (is_skipping_changes())
2265 : {
2266 4 : stop_skipping_changes();
2267 :
2268 : /*
2269 : * Start a new transaction to clear the subskiplsn, if not started
2270 : * yet.
2271 : */
2272 4 : if (!IsTransactionState())
2273 2 : StartTransactionCommand();
2274 : }
2275 :
2276 914 : if (IsTransactionState())
2277 : {
2278 : /*
2279 : * The transaction is either non-empty or skipped, so we clear the
2280 : * subskiplsn.
2281 : */
2282 914 : clear_subscription_skip_lsn(commit_data->commit_lsn);
2283 :
2284 : /*
2285 : * Update origin state so we can restart streaming from correct
2286 : * position in case of crash.
2287 : */
2288 914 : replorigin_session_origin_lsn = commit_data->end_lsn;
2289 914 : replorigin_session_origin_timestamp = commit_data->committime;
2290 :
2291 914 : CommitTransactionCommand();
2292 :
2293 914 : if (IsTransactionBlock())
2294 : {
2295 8 : EndTransactionBlock(false);
2296 8 : CommitTransactionCommand();
2297 : }
2298 :
2299 914 : pgstat_report_stat(false);
2300 :
2301 914 : store_flush_position(commit_data->end_lsn, XactLastCommitEnd);
2302 : }
2303 : else
2304 : {
2305 : /* Process any invalidation messages that might have accumulated. */
2306 0 : AcceptInvalidationMessages();
2307 0 : maybe_reread_subscription();
2308 : }
2309 :
2310 914 : in_remote_transaction = false;
2311 914 : }
2312 :
2313 : /*
2314 : * Handle RELATION message.
2315 : *
2316 : * Note we don't do validation against local schema here. The validation
2317 : * against local schema is postponed until first change for given relation
2318 : * comes as we only care about it when applying changes for it anyway and we
2319 : * do less locking this way.
2320 : */
2321 : static void
2322 860 : apply_handle_relation(StringInfo s)
2323 : {
2324 : LogicalRepRelation *rel;
2325 :
2326 860 : if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
2327 70 : return;
2328 :
2329 790 : rel = logicalrep_read_rel(s);
2330 790 : logicalrep_relmap_update(rel);
2331 :
2332 : /* Also reset all entries in the partition map that refer to remoterel. */
2333 790 : logicalrep_partmap_reset_relmap(rel);
2334 : }
2335 :
2336 : /*
2337 : * Handle TYPE message.
2338 : *
2339 : * This implementation pays no attention to TYPE messages; we expect the user
2340 : * to have set things up so that the incoming data is acceptable to the input
2341 : * functions for the locally subscribed tables. Hence, we just read and
2342 : * discard the message.
2343 : */
2344 : static void
2345 36 : apply_handle_type(StringInfo s)
2346 : {
2347 : LogicalRepTyp typ;
2348 :
2349 36 : if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s))
2350 0 : return;
2351 :
2352 36 : logicalrep_read_typ(s, &typ);
2353 : }
2354 :
2355 : /*
2356 : * Check that we (the subscription owner) have sufficient privileges on the
2357 : * target relation to perform the given operation.
2358 : */
2359 : static void
2360 441912 : TargetPrivilegesCheck(Relation rel, AclMode mode)
2361 : {
2362 : Oid relid;
2363 : AclResult aclresult;
2364 :
2365 441912 : relid = RelationGetRelid(rel);
2366 441912 : aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
2367 441912 : if (aclresult != ACLCHECK_OK)
2368 14 : aclcheck_error(aclresult,
2369 14 : get_relkind_objtype(rel->rd_rel->relkind),
2370 14 : get_rel_name(relid));
2371 :
2372 : /*
2373 : * We lack the infrastructure to honor RLS policies. It might be possible
2374 : * to add such infrastructure here, but tablesync workers lack it, too, so
2375 : * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2376 : * but it seems dangerous to replicate a TRUNCATE and then refuse to
2377 : * replicate subsequent INSERTs, so we forbid all commands the same.
2378 : */
2379 441898 : if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2380 6 : ereport(ERROR,
2381 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2382 : errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2383 : GetUserNameFromId(GetUserId(), true),
2384 : RelationGetRelationName(rel))));
2385 441892 : }
2386 :
2387 : /*
2388 : * Handle INSERT message.
2389 : */
2390 :
2391 : static void
2392 373010 : apply_handle_insert(StringInfo s)
2393 : {
2394 : LogicalRepRelMapEntry *rel;
2395 : LogicalRepTupleData newtup;
2396 : LogicalRepRelId relid;
2397 : UserContext ucxt;
2398 : ApplyExecutionData *edata;
2399 : EState *estate;
2400 : TupleTableSlot *remoteslot;
2401 : MemoryContext oldctx;
2402 : bool run_as_owner;
2403 :
2404 : /*
2405 : * Quick return if we are skipping data modification changes or handling
2406 : * streamed transactions.
2407 : */
2408 726018 : if (is_skipping_changes() ||
2409 353008 : handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
2410 220118 : return;
2411 :
2412 152996 : begin_replication_step();
2413 :
2414 152994 : relid = logicalrep_read_insert(s, &newtup);
2415 152994 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2416 152980 : if (!should_apply_changes_for_rel(rel))
2417 : {
2418 : /*
2419 : * The relation can't become interesting in the middle of the
2420 : * transaction so it's safe to unlock it.
2421 : */
2422 104 : logicalrep_rel_close(rel, RowExclusiveLock);
2423 104 : end_replication_step();
2424 104 : return;
2425 : }
2426 :
2427 : /*
2428 : * Make sure that any user-supplied code runs as the table owner, unless
2429 : * the user has opted out of that behavior.
2430 : */
2431 152876 : run_as_owner = MySubscription->runasowner;
2432 152876 : if (!run_as_owner)
2433 152860 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2434 :
2435 : /* Set relation for error callback */
2436 152876 : apply_error_callback_arg.rel = rel;
2437 :
2438 : /* Initialize the executor state. */
2439 152876 : edata = create_edata_for_relation(rel);
2440 152876 : estate = edata->estate;
2441 152876 : remoteslot = ExecInitExtraTupleSlot(estate,
2442 152876 : RelationGetDescr(rel->localrel),
2443 : &TTSOpsVirtual);
2444 :
2445 : /* Process and store remote tuple in the slot */
2446 152876 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2447 152876 : slot_store_data(remoteslot, rel, &newtup);
2448 152876 : slot_fill_defaults(rel, estate, remoteslot);
2449 152876 : MemoryContextSwitchTo(oldctx);
2450 :
2451 : /* For a partitioned table, insert the tuple into a partition. */
2452 152876 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2453 88 : apply_handle_tuple_routing(edata,
2454 : remoteslot, NULL, CMD_INSERT);
2455 : else
2456 152788 : apply_handle_insert_internal(edata, edata->targetRelInfo,
2457 : remoteslot);
2458 :
2459 152850 : finish_edata(edata);
2460 :
2461 : /* Reset relation for error callback */
2462 152850 : apply_error_callback_arg.rel = NULL;
2463 :
2464 152850 : if (!run_as_owner)
2465 152840 : RestoreUserContext(&ucxt);
2466 :
2467 152850 : logicalrep_rel_close(rel, NoLock);
2468 :
2469 152850 : end_replication_step();
2470 : }
2471 :
2472 : /*
2473 : * Workhorse for apply_handle_insert()
2474 : * relinfo is for the relation we're actually inserting into
2475 : * (could be a child partition of edata->targetRelInfo)
2476 : */
2477 : static void
2478 152878 : apply_handle_insert_internal(ApplyExecutionData *edata,
2479 : ResultRelInfo *relinfo,
2480 : TupleTableSlot *remoteslot)
2481 : {
2482 152878 : EState *estate = edata->estate;
2483 :
2484 : /* We must open indexes here. */
2485 152878 : ExecOpenIndices(relinfo, true);
2486 152878 : InitConflictIndexes(relinfo);
2487 :
2488 : /* Do the insert. */
2489 152878 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
2490 152866 : ExecSimpleRelationInsert(relinfo, estate, remoteslot);
2491 :
2492 : /* Cleanup. */
2493 152852 : ExecCloseIndices(relinfo);
2494 152852 : }
2495 :
2496 : /*
2497 : * Check if the logical replication relation is updatable and throw
2498 : * appropriate error if it isn't.
2499 : */
2500 : static void
2501 144562 : check_relation_updatable(LogicalRepRelMapEntry *rel)
2502 : {
2503 : /*
2504 : * For partitioned tables, we only need to care if the target partition is
2505 : * updatable (aka has PK or RI defined for it).
2506 : */
2507 144562 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2508 60 : return;
2509 :
2510 : /* Updatable, no error. */
2511 144502 : if (rel->updatable)
2512 144502 : return;
2513 :
2514 : /*
2515 : * We are in error mode so it's fine this is somewhat slow. It's better to
2516 : * give user correct error.
2517 : */
2518 0 : if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
2519 : {
2520 0 : ereport(ERROR,
2521 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2522 : errmsg("publisher did not send replica identity column "
2523 : "expected by the logical replication target relation \"%s.%s\"",
2524 : rel->remoterel.nspname, rel->remoterel.relname)));
2525 : }
2526 :
2527 0 : ereport(ERROR,
2528 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2529 : errmsg("logical replication target relation \"%s.%s\" has "
2530 : "neither REPLICA IDENTITY index nor PRIMARY "
2531 : "KEY and published relation does not have "
2532 : "REPLICA IDENTITY FULL",
2533 : rel->remoterel.nspname, rel->remoterel.relname)));
2534 : }
2535 :
2536 : /*
2537 : * Handle UPDATE message.
2538 : *
2539 : * TODO: FDW support
2540 : */
2541 : static void
2542 132314 : apply_handle_update(StringInfo s)
2543 : {
2544 : LogicalRepRelMapEntry *rel;
2545 : LogicalRepRelId relid;
2546 : UserContext ucxt;
2547 : ApplyExecutionData *edata;
2548 : EState *estate;
2549 : LogicalRepTupleData oldtup;
2550 : LogicalRepTupleData newtup;
2551 : bool has_oldtup;
2552 : TupleTableSlot *remoteslot;
2553 : RTEPermissionInfo *target_perminfo;
2554 : MemoryContext oldctx;
2555 : bool run_as_owner;
2556 :
2557 : /*
2558 : * Quick return if we are skipping data modification changes or handling
2559 : * streamed transactions.
2560 : */
2561 264622 : if (is_skipping_changes() ||
2562 132308 : handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
2563 68446 : return;
2564 :
2565 63868 : begin_replication_step();
2566 :
2567 63866 : relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
2568 : &newtup);
2569 63866 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2570 63866 : if (!should_apply_changes_for_rel(rel))
2571 : {
2572 : /*
2573 : * The relation can't become interesting in the middle of the
2574 : * transaction so it's safe to unlock it.
2575 : */
2576 0 : logicalrep_rel_close(rel, RowExclusiveLock);
2577 0 : end_replication_step();
2578 0 : return;
2579 : }
2580 :
2581 : /* Set relation for error callback */
2582 63866 : apply_error_callback_arg.rel = rel;
2583 :
2584 : /* Check if we can do the update. */
2585 63866 : check_relation_updatable(rel);
2586 :
2587 : /*
2588 : * Make sure that any user-supplied code runs as the table owner, unless
2589 : * the user has opted out of that behavior.
2590 : */
2591 63866 : run_as_owner = MySubscription->runasowner;
2592 63866 : if (!run_as_owner)
2593 63860 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2594 :
2595 : /* Initialize the executor state. */
2596 63864 : edata = create_edata_for_relation(rel);
2597 63864 : estate = edata->estate;
2598 63864 : remoteslot = ExecInitExtraTupleSlot(estate,
2599 63864 : RelationGetDescr(rel->localrel),
2600 : &TTSOpsVirtual);
2601 :
2602 : /*
2603 : * Populate updatedCols so that per-column triggers can fire, and so
2604 : * executor can correctly pass down indexUnchanged hint. This could
2605 : * include more columns than were actually changed on the publisher
2606 : * because the logical replication protocol doesn't contain that
2607 : * information. But it would for example exclude columns that only exist
2608 : * on the subscriber, since we are not touching those.
2609 : */
2610 63864 : target_perminfo = list_nth(estate->es_rteperminfos, 0);
2611 318588 : for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2612 : {
2613 254724 : Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i);
2614 254724 : int remoteattnum = rel->attrmap->attnums[i];
2615 :
2616 254724 : if (!att->attisdropped && remoteattnum >= 0)
2617 : {
2618 : Assert(remoteattnum < newtup.ncols);
2619 137694 : if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
2620 137688 : target_perminfo->updatedCols =
2621 137688 : bms_add_member(target_perminfo->updatedCols,
2622 : i + 1 - FirstLowInvalidHeapAttributeNumber);
2623 : }
2624 : }
2625 :
2626 : /* Build the search tuple. */
2627 63864 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2628 63864 : slot_store_data(remoteslot, rel,
2629 63864 : has_oldtup ? &oldtup : &newtup);
2630 63864 : MemoryContextSwitchTo(oldctx);
2631 :
2632 : /* For a partitioned table, apply update to correct partition. */
2633 63864 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2634 26 : apply_handle_tuple_routing(edata,
2635 : remoteslot, &newtup, CMD_UPDATE);
2636 : else
2637 63838 : apply_handle_update_internal(edata, edata->targetRelInfo,
2638 : remoteslot, &newtup, rel->localindexoid);
2639 :
2640 63854 : finish_edata(edata);
2641 :
2642 : /* Reset relation for error callback */
2643 63854 : apply_error_callback_arg.rel = NULL;
2644 :
2645 63854 : if (!run_as_owner)
2646 63850 : RestoreUserContext(&ucxt);
2647 :
2648 63854 : logicalrep_rel_close(rel, NoLock);
2649 :
2650 63854 : end_replication_step();
2651 : }
2652 :
2653 : /*
2654 : * Workhorse for apply_handle_update()
2655 : * relinfo is for the relation we're actually updating in
2656 : * (could be a child partition of edata->targetRelInfo)
2657 : */
2658 : static void
2659 63838 : apply_handle_update_internal(ApplyExecutionData *edata,
2660 : ResultRelInfo *relinfo,
2661 : TupleTableSlot *remoteslot,
2662 : LogicalRepTupleData *newtup,
2663 : Oid localindexoid)
2664 : {
2665 63838 : EState *estate = edata->estate;
2666 63838 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2667 63838 : Relation localrel = relinfo->ri_RelationDesc;
2668 : EPQState epqstate;
2669 : TupleTableSlot *localslot;
2670 : bool found;
2671 : MemoryContext oldctx;
2672 :
2673 63838 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2674 63838 : ExecOpenIndices(relinfo, true);
2675 :
2676 63838 : found = FindReplTupleInLocalRel(edata, localrel,
2677 : &relmapentry->remoterel,
2678 : localindexoid,
2679 : remoteslot, &localslot);
2680 :
2681 : /*
2682 : * Tuple found.
2683 : *
2684 : * Note this will fail if there are other conflicting unique indexes.
2685 : */
2686 63830 : if (found)
2687 : {
2688 : RepOriginId localorigin;
2689 : TransactionId localxmin;
2690 : TimestampTz localts;
2691 :
2692 : /*
2693 : * Report the conflict if the tuple was modified by a different
2694 : * origin.
2695 : */
2696 63824 : if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
2697 4 : localorigin != replorigin_session_origin)
2698 : {
2699 : TupleTableSlot *newslot;
2700 :
2701 : /* Store the new tuple for conflict reporting */
2702 4 : newslot = table_slot_create(localrel, &estate->es_tupleTable);
2703 4 : slot_store_data(newslot, relmapentry, newtup);
2704 :
2705 4 : ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
2706 : remoteslot, localslot, newslot,
2707 : InvalidOid, localxmin, localorigin, localts);
2708 : }
2709 :
2710 : /* Process and store remote tuple in the slot */
2711 63824 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2712 63824 : slot_modify_data(remoteslot, localslot, relmapentry, newtup);
2713 63824 : MemoryContextSwitchTo(oldctx);
2714 :
2715 63824 : EvalPlanQualSetSlot(&epqstate, remoteslot);
2716 :
2717 63824 : InitConflictIndexes(relinfo);
2718 :
2719 : /* Do the actual update. */
2720 63824 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
2721 63824 : ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2722 : remoteslot);
2723 : }
2724 : else
2725 : {
2726 6 : TupleTableSlot *newslot = localslot;
2727 :
2728 : /* Store the new tuple for conflict reporting */
2729 6 : slot_store_data(newslot, relmapentry, newtup);
2730 :
2731 : /*
2732 : * The tuple to be updated could not be found. Do nothing except for
2733 : * emitting a log message.
2734 : */
2735 6 : ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
2736 : remoteslot, NULL, newslot,
2737 : InvalidOid, InvalidTransactionId,
2738 : InvalidRepOriginId, 0);
2739 : }
2740 :
2741 : /* Cleanup. */
2742 63828 : ExecCloseIndices(relinfo);
2743 63828 : EvalPlanQualEnd(&epqstate);
2744 63828 : }
2745 :
2746 : /*
2747 : * Handle DELETE message.
2748 : *
2749 : * TODO: FDW support
2750 : */
2751 : static void
2752 163866 : apply_handle_delete(StringInfo s)
2753 : {
2754 : LogicalRepRelMapEntry *rel;
2755 : LogicalRepTupleData oldtup;
2756 : LogicalRepRelId relid;
2757 : UserContext ucxt;
2758 : ApplyExecutionData *edata;
2759 : EState *estate;
2760 : TupleTableSlot *remoteslot;
2761 : MemoryContext oldctx;
2762 : bool run_as_owner;
2763 :
2764 : /*
2765 : * Quick return if we are skipping data modification changes or handling
2766 : * streamed transactions.
2767 : */
2768 327732 : if (is_skipping_changes() ||
2769 163866 : handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
2770 83230 : return;
2771 :
2772 80636 : begin_replication_step();
2773 :
2774 80636 : relid = logicalrep_read_delete(s, &oldtup);
2775 80636 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2776 80636 : if (!should_apply_changes_for_rel(rel))
2777 : {
2778 : /*
2779 : * The relation can't become interesting in the middle of the
2780 : * transaction so it's safe to unlock it.
2781 : */
2782 0 : logicalrep_rel_close(rel, RowExclusiveLock);
2783 0 : end_replication_step();
2784 0 : return;
2785 : }
2786 :
2787 : /* Set relation for error callback */
2788 80636 : apply_error_callback_arg.rel = rel;
2789 :
2790 : /* Check if we can do the delete. */
2791 80636 : check_relation_updatable(rel);
2792 :
2793 : /*
2794 : * Make sure that any user-supplied code runs as the table owner, unless
2795 : * the user has opted out of that behavior.
2796 : */
2797 80636 : run_as_owner = MySubscription->runasowner;
2798 80636 : if (!run_as_owner)
2799 80632 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2800 :
2801 : /* Initialize the executor state. */
2802 80636 : edata = create_edata_for_relation(rel);
2803 80636 : estate = edata->estate;
2804 80636 : remoteslot = ExecInitExtraTupleSlot(estate,
2805 80636 : RelationGetDescr(rel->localrel),
2806 : &TTSOpsVirtual);
2807 :
2808 : /* Build the search tuple. */
2809 80636 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2810 80636 : slot_store_data(remoteslot, rel, &oldtup);
2811 80636 : MemoryContextSwitchTo(oldctx);
2812 :
2813 : /* For a partitioned table, apply delete to correct partition. */
2814 80636 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2815 34 : apply_handle_tuple_routing(edata,
2816 : remoteslot, NULL, CMD_DELETE);
2817 : else
2818 80602 : apply_handle_delete_internal(edata, edata->targetRelInfo,
2819 : remoteslot, rel->localindexoid);
2820 :
2821 80636 : finish_edata(edata);
2822 :
2823 : /* Reset relation for error callback */
2824 80636 : apply_error_callback_arg.rel = NULL;
2825 :
2826 80636 : if (!run_as_owner)
2827 80632 : RestoreUserContext(&ucxt);
2828 :
2829 80636 : logicalrep_rel_close(rel, NoLock);
2830 :
2831 80636 : end_replication_step();
2832 : }
2833 :
2834 : /*
2835 : * Workhorse for apply_handle_delete()
2836 : * relinfo is for the relation we're actually deleting from
2837 : * (could be a child partition of edata->targetRelInfo)
2838 : */
2839 : static void
2840 80636 : apply_handle_delete_internal(ApplyExecutionData *edata,
2841 : ResultRelInfo *relinfo,
2842 : TupleTableSlot *remoteslot,
2843 : Oid localindexoid)
2844 : {
2845 80636 : EState *estate = edata->estate;
2846 80636 : Relation localrel = relinfo->ri_RelationDesc;
2847 80636 : LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
2848 : EPQState epqstate;
2849 : TupleTableSlot *localslot;
2850 : bool found;
2851 :
2852 80636 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
2853 80636 : ExecOpenIndices(relinfo, false);
2854 :
2855 80636 : found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
2856 : remoteslot, &localslot);
2857 :
2858 : /* If found delete it. */
2859 80636 : if (found)
2860 : {
2861 : RepOriginId localorigin;
2862 : TransactionId localxmin;
2863 : TimestampTz localts;
2864 :
2865 : /*
2866 : * Report the conflict if the tuple was modified by a different
2867 : * origin.
2868 : */
2869 80618 : if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
2870 6 : localorigin != replorigin_session_origin)
2871 4 : ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
2872 : remoteslot, localslot, NULL,
2873 : InvalidOid, localxmin, localorigin, localts);
2874 :
2875 80618 : EvalPlanQualSetSlot(&epqstate, localslot);
2876 :
2877 : /* Do the actual delete. */
2878 80618 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_DELETE);
2879 80618 : ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
2880 : }
2881 : else
2882 : {
2883 : /*
2884 : * The tuple to be deleted could not be found. Do nothing except for
2885 : * emitting a log message.
2886 : */
2887 18 : ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
2888 : remoteslot, NULL, NULL,
2889 : InvalidOid, InvalidTransactionId,
2890 : InvalidRepOriginId, 0);
2891 : }
2892 :
2893 : /* Cleanup. */
2894 80636 : ExecCloseIndices(relinfo);
2895 80636 : EvalPlanQualEnd(&epqstate);
2896 80636 : }
2897 :
2898 : /*
2899 : * Try to find a tuple received from the publication side (in 'remoteslot') in
2900 : * the corresponding local relation using either replica identity index,
2901 : * primary key, index or if needed, sequential scan.
2902 : *
2903 : * Local tuple, if found, is returned in '*localslot'.
2904 : */
2905 : static bool
2906 144500 : FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
2907 : LogicalRepRelation *remoterel,
2908 : Oid localidxoid,
2909 : TupleTableSlot *remoteslot,
2910 : TupleTableSlot **localslot)
2911 : {
2912 144500 : EState *estate = edata->estate;
2913 : bool found;
2914 :
2915 : /*
2916 : * Regardless of the top-level operation, we're performing a read here, so
2917 : * check for SELECT privileges.
2918 : */
2919 144500 : TargetPrivilegesCheck(localrel, ACL_SELECT);
2920 :
2921 144492 : *localslot = table_slot_create(localrel, &estate->es_tupleTable);
2922 :
2923 : Assert(OidIsValid(localidxoid) ||
2924 : (remoterel->replident == REPLICA_IDENTITY_FULL));
2925 :
2926 144492 : if (OidIsValid(localidxoid))
2927 : {
2928 : #ifdef USE_ASSERT_CHECKING
2929 : Relation idxrel = index_open(localidxoid, AccessShareLock);
2930 :
2931 : /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
2932 : Assert(GetRelationIdentityOrPK(localrel) == localidxoid ||
2933 : (remoterel->replident == REPLICA_IDENTITY_FULL &&
2934 : IsIndexUsableForReplicaIdentityFull(idxrel,
2935 : edata->targetRel->attrmap)));
2936 : index_close(idxrel, AccessShareLock);
2937 : #endif
2938 :
2939 144194 : found = RelationFindReplTupleByIndex(localrel, localidxoid,
2940 : LockTupleExclusive,
2941 : remoteslot, *localslot);
2942 : }
2943 : else
2944 298 : found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
2945 : remoteslot, *localslot);
2946 :
2947 144492 : return found;
2948 : }
2949 :
2950 : /*
2951 : * This handles insert, update, delete on a partitioned table.
2952 : */
2953 : static void
2954 148 : apply_handle_tuple_routing(ApplyExecutionData *edata,
2955 : TupleTableSlot *remoteslot,
2956 : LogicalRepTupleData *newtup,
2957 : CmdType operation)
2958 : {
2959 148 : EState *estate = edata->estate;
2960 148 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2961 148 : ResultRelInfo *relinfo = edata->targetRelInfo;
2962 148 : Relation parentrel = relinfo->ri_RelationDesc;
2963 : ModifyTableState *mtstate;
2964 : PartitionTupleRouting *proute;
2965 : ResultRelInfo *partrelinfo;
2966 : Relation partrel;
2967 : TupleTableSlot *remoteslot_part;
2968 : TupleConversionMap *map;
2969 : MemoryContext oldctx;
2970 148 : LogicalRepRelMapEntry *part_entry = NULL;
2971 148 : AttrMap *attrmap = NULL;
2972 :
2973 : /* ModifyTableState is needed for ExecFindPartition(). */
2974 148 : edata->mtstate = mtstate = makeNode(ModifyTableState);
2975 148 : mtstate->ps.plan = NULL;
2976 148 : mtstate->ps.state = estate;
2977 148 : mtstate->operation = operation;
2978 148 : mtstate->resultRelInfo = relinfo;
2979 :
2980 : /* ... as is PartitionTupleRouting. */
2981 148 : edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
2982 :
2983 : /*
2984 : * Find the partition to which the "search tuple" belongs.
2985 : */
2986 : Assert(remoteslot != NULL);
2987 148 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2988 148 : partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
2989 : remoteslot, estate);
2990 : Assert(partrelinfo != NULL);
2991 148 : partrel = partrelinfo->ri_RelationDesc;
2992 :
2993 : /*
2994 : * Check for supported relkind. We need this since partitions might be of
2995 : * unsupported relkinds; and the set of partitions can change, so checking
2996 : * at CREATE/ALTER SUBSCRIPTION would be insufficient.
2997 : */
2998 148 : CheckSubscriptionRelkind(partrel->rd_rel->relkind,
2999 148 : get_namespace_name(RelationGetNamespace(partrel)),
3000 148 : RelationGetRelationName(partrel));
3001 :
3002 : /*
3003 : * To perform any of the operations below, the tuple must match the
3004 : * partition's rowtype. Convert if needed or just copy, using a dedicated
3005 : * slot to store the tuple in any case.
3006 : */
3007 148 : remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
3008 148 : if (remoteslot_part == NULL)
3009 82 : remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
3010 148 : map = ExecGetRootToChildMap(partrelinfo, estate);
3011 148 : if (map != NULL)
3012 : {
3013 66 : attrmap = map->attrMap;
3014 66 : remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
3015 : remoteslot_part);
3016 : }
3017 : else
3018 : {
3019 82 : remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
3020 82 : slot_getallattrs(remoteslot_part);
3021 : }
3022 148 : MemoryContextSwitchTo(oldctx);
3023 :
3024 : /* Check if we can do the update or delete on the leaf partition. */
3025 148 : if (operation == CMD_UPDATE || operation == CMD_DELETE)
3026 : {
3027 60 : part_entry = logicalrep_partition_open(relmapentry, partrel,
3028 : attrmap);
3029 60 : check_relation_updatable(part_entry);
3030 : }
3031 :
3032 148 : switch (operation)
3033 : {
3034 88 : case CMD_INSERT:
3035 88 : apply_handle_insert_internal(edata, partrelinfo,
3036 : remoteslot_part);
3037 88 : break;
3038 :
3039 34 : case CMD_DELETE:
3040 34 : apply_handle_delete_internal(edata, partrelinfo,
3041 : remoteslot_part,
3042 : part_entry->localindexoid);
3043 34 : break;
3044 :
3045 26 : case CMD_UPDATE:
3046 :
3047 : /*
3048 : * For UPDATE, depending on whether or not the updated tuple
3049 : * satisfies the partition's constraint, perform a simple UPDATE
3050 : * of the partition or move the updated tuple into a different
3051 : * suitable partition.
3052 : */
3053 : {
3054 : TupleTableSlot *localslot;
3055 : ResultRelInfo *partrelinfo_new;
3056 : Relation partrel_new;
3057 : bool found;
3058 : EPQState epqstate;
3059 : RepOriginId localorigin;
3060 : TransactionId localxmin;
3061 : TimestampTz localts;
3062 :
3063 : /* Get the matching local tuple from the partition. */
3064 26 : found = FindReplTupleInLocalRel(edata, partrel,
3065 : &part_entry->remoterel,
3066 : part_entry->localindexoid,
3067 : remoteslot_part, &localslot);
3068 26 : if (!found)
3069 : {
3070 4 : TupleTableSlot *newslot = localslot;
3071 :
3072 : /* Store the new tuple for conflict reporting */
3073 4 : slot_store_data(newslot, part_entry, newtup);
3074 :
3075 : /*
3076 : * The tuple to be updated could not be found. Do nothing
3077 : * except for emitting a log message.
3078 : */
3079 4 : ReportApplyConflict(estate, partrelinfo,
3080 : LOG, CT_UPDATE_MISSING,
3081 : remoteslot_part, NULL, newslot,
3082 : InvalidOid, InvalidTransactionId,
3083 : InvalidRepOriginId, 0);
3084 :
3085 4 : return;
3086 : }
3087 :
3088 : /*
3089 : * Report the conflict if the tuple was modified by a
3090 : * different origin.
3091 : */
3092 22 : if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
3093 2 : localorigin != replorigin_session_origin)
3094 : {
3095 : TupleTableSlot *newslot;
3096 :
3097 : /* Store the new tuple for conflict reporting */
3098 2 : newslot = table_slot_create(partrel, &estate->es_tupleTable);
3099 2 : slot_store_data(newslot, part_entry, newtup);
3100 :
3101 2 : ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
3102 : remoteslot_part, localslot, newslot,
3103 : InvalidOid, localxmin, localorigin,
3104 : localts);
3105 : }
3106 :
3107 : /*
3108 : * Apply the update to the local tuple, putting the result in
3109 : * remoteslot_part.
3110 : */
3111 22 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3112 22 : slot_modify_data(remoteslot_part, localslot, part_entry,
3113 : newtup);
3114 22 : MemoryContextSwitchTo(oldctx);
3115 :
3116 22 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
3117 :
3118 : /*
3119 : * Does the updated tuple still satisfy the current
3120 : * partition's constraint?
3121 : */
3122 44 : if (!partrel->rd_rel->relispartition ||
3123 22 : ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
3124 : false))
3125 : {
3126 : /*
3127 : * Yes, so simply UPDATE the partition. We don't call
3128 : * apply_handle_update_internal() here, which would
3129 : * normally do the following work, to avoid repeating some
3130 : * work already done above to find the local tuple in the
3131 : * partition.
3132 : */
3133 20 : ExecOpenIndices(partrelinfo, true);
3134 20 : InitConflictIndexes(partrelinfo);
3135 :
3136 20 : EvalPlanQualSetSlot(&epqstate, remoteslot_part);
3137 20 : TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
3138 : ACL_UPDATE);
3139 20 : ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3140 : localslot, remoteslot_part);
3141 : }
3142 : else
3143 : {
3144 : /* Move the tuple into the new partition. */
3145 :
3146 : /*
3147 : * New partition will be found using tuple routing, which
3148 : * can only occur via the parent table. We might need to
3149 : * convert the tuple to the parent's rowtype. Note that
3150 : * this is the tuple found in the partition, not the
3151 : * original search tuple received by this function.
3152 : */
3153 2 : if (map)
3154 : {
3155 : TupleConversionMap *PartitionToRootMap =
3156 2 : convert_tuples_by_name(RelationGetDescr(partrel),
3157 : RelationGetDescr(parentrel));
3158 :
3159 : remoteslot =
3160 2 : execute_attr_map_slot(PartitionToRootMap->attrMap,
3161 : remoteslot_part, remoteslot);
3162 : }
3163 : else
3164 : {
3165 0 : remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
3166 0 : slot_getallattrs(remoteslot);
3167 : }
3168 :
3169 : /* Find the new partition. */
3170 2 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3171 2 : partrelinfo_new = ExecFindPartition(mtstate, relinfo,
3172 : proute, remoteslot,
3173 : estate);
3174 2 : MemoryContextSwitchTo(oldctx);
3175 : Assert(partrelinfo_new != partrelinfo);
3176 2 : partrel_new = partrelinfo_new->ri_RelationDesc;
3177 :
3178 : /* Check that new partition also has supported relkind. */
3179 2 : CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3180 2 : get_namespace_name(RelationGetNamespace(partrel_new)),
3181 2 : RelationGetRelationName(partrel_new));
3182 :
3183 2 : ExecOpenIndices(partrelinfo, false);
3184 :
3185 : /* DELETE old tuple found in the old partition. */
3186 2 : EvalPlanQualSetSlot(&epqstate, localslot);
3187 2 : TargetPrivilegesCheck(partrelinfo->ri_RelationDesc, ACL_DELETE);
3188 2 : ExecSimpleRelationDelete(partrelinfo, estate, &epqstate, localslot);
3189 :
3190 : /* INSERT new tuple into the new partition. */
3191 :
3192 : /*
3193 : * Convert the replacement tuple to match the destination
3194 : * partition rowtype.
3195 : */
3196 2 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3197 2 : remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
3198 2 : if (remoteslot_part == NULL)
3199 2 : remoteslot_part = table_slot_create(partrel_new,
3200 : &estate->es_tupleTable);
3201 2 : map = ExecGetRootToChildMap(partrelinfo_new, estate);
3202 2 : if (map != NULL)
3203 : {
3204 0 : remoteslot_part = execute_attr_map_slot(map->attrMap,
3205 : remoteslot,
3206 : remoteslot_part);
3207 : }
3208 : else
3209 : {
3210 2 : remoteslot_part = ExecCopySlot(remoteslot_part,
3211 : remoteslot);
3212 2 : slot_getallattrs(remoteslot);
3213 : }
3214 2 : MemoryContextSwitchTo(oldctx);
3215 2 : apply_handle_insert_internal(edata, partrelinfo_new,
3216 : remoteslot_part);
3217 : }
3218 :
3219 22 : ExecCloseIndices(partrelinfo);
3220 22 : EvalPlanQualEnd(&epqstate);
3221 : }
3222 22 : break;
3223 :
3224 0 : default:
3225 0 : elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3226 : break;
3227 : }
3228 : }
3229 :
3230 : /*
3231 : * Handle TRUNCATE message.
3232 : *
3233 : * TODO: FDW support
3234 : */
3235 : static void
3236 38 : apply_handle_truncate(StringInfo s)
3237 : {
3238 38 : bool cascade = false;
3239 38 : bool restart_seqs = false;
3240 38 : List *remote_relids = NIL;
3241 38 : List *remote_rels = NIL;
3242 38 : List *rels = NIL;
3243 38 : List *part_rels = NIL;
3244 38 : List *relids = NIL;
3245 38 : List *relids_logged = NIL;
3246 : ListCell *lc;
3247 38 : LOCKMODE lockmode = AccessExclusiveLock;
3248 :
3249 : /*
3250 : * Quick return if we are skipping data modification changes or handling
3251 : * streamed transactions.
3252 : */
3253 76 : if (is_skipping_changes() ||
3254 38 : handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
3255 0 : return;
3256 :
3257 38 : begin_replication_step();
3258 :
3259 38 : remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3260 :
3261 94 : foreach(lc, remote_relids)
3262 : {
3263 56 : LogicalRepRelId relid = lfirst_oid(lc);
3264 : LogicalRepRelMapEntry *rel;
3265 :
3266 56 : rel = logicalrep_rel_open(relid, lockmode);
3267 56 : if (!should_apply_changes_for_rel(rel))
3268 : {
3269 : /*
3270 : * The relation can't become interesting in the middle of the
3271 : * transaction so it's safe to unlock it.
3272 : */
3273 0 : logicalrep_rel_close(rel, lockmode);
3274 0 : continue;
3275 : }
3276 :
3277 56 : remote_rels = lappend(remote_rels, rel);
3278 56 : TargetPrivilegesCheck(rel->localrel, ACL_TRUNCATE);
3279 56 : rels = lappend(rels, rel->localrel);
3280 56 : relids = lappend_oid(relids, rel->localreloid);
3281 56 : if (RelationIsLogicallyLogged(rel->localrel))
3282 0 : relids_logged = lappend_oid(relids_logged, rel->localreloid);
3283 :
3284 : /*
3285 : * Truncate partitions if we got a message to truncate a partitioned
3286 : * table.
3287 : */
3288 56 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3289 : {
3290 : ListCell *child;
3291 8 : List *children = find_all_inheritors(rel->localreloid,
3292 : lockmode,
3293 : NULL);
3294 :
3295 30 : foreach(child, children)
3296 : {
3297 22 : Oid childrelid = lfirst_oid(child);
3298 : Relation childrel;
3299 :
3300 22 : if (list_member_oid(relids, childrelid))
3301 8 : continue;
3302 :
3303 : /* find_all_inheritors already got lock */
3304 14 : childrel = table_open(childrelid, NoLock);
3305 :
3306 : /*
3307 : * Ignore temp tables of other backends. See similar code in
3308 : * ExecuteTruncate().
3309 : */
3310 14 : if (RELATION_IS_OTHER_TEMP(childrel))
3311 : {
3312 0 : table_close(childrel, lockmode);
3313 0 : continue;
3314 : }
3315 :
3316 14 : TargetPrivilegesCheck(childrel, ACL_TRUNCATE);
3317 14 : rels = lappend(rels, childrel);
3318 14 : part_rels = lappend(part_rels, childrel);
3319 14 : relids = lappend_oid(relids, childrelid);
3320 : /* Log this relation only if needed for logical decoding */
3321 14 : if (RelationIsLogicallyLogged(childrel))
3322 0 : relids_logged = lappend_oid(relids_logged, childrelid);
3323 : }
3324 : }
3325 : }
3326 :
3327 : /*
3328 : * Even if we used CASCADE on the upstream primary we explicitly default
3329 : * to replaying changes without further cascading. This might be later
3330 : * changeable with a user specified option.
3331 : *
3332 : * MySubscription->runasowner tells us whether we want to execute
3333 : * replication actions as the subscription owner; the last argument to
3334 : * TruncateGuts tells it whether we want to switch to the table owner.
3335 : * Those are exactly opposite conditions.
3336 : */
3337 38 : ExecuteTruncateGuts(rels,
3338 : relids,
3339 : relids_logged,
3340 : DROP_RESTRICT,
3341 : restart_seqs,
3342 38 : !MySubscription->runasowner);
3343 94 : foreach(lc, remote_rels)
3344 : {
3345 56 : LogicalRepRelMapEntry *rel = lfirst(lc);
3346 :
3347 56 : logicalrep_rel_close(rel, NoLock);
3348 : }
3349 52 : foreach(lc, part_rels)
3350 : {
3351 14 : Relation rel = lfirst(lc);
3352 :
3353 14 : table_close(rel, NoLock);
3354 : }
3355 :
3356 38 : end_replication_step();
3357 : }
3358 :
3359 :
3360 : /*
3361 : * Logical replication protocol message dispatcher.
3362 : */
3363 : void
3364 675710 : apply_dispatch(StringInfo s)
3365 : {
3366 675710 : LogicalRepMsgType action = pq_getmsgbyte(s);
3367 : LogicalRepMsgType saved_command;
3368 :
3369 : /*
3370 : * Set the current command being applied. Since this function can be
3371 : * called recursively when applying spooled changes, save the current
3372 : * command.
3373 : */
3374 675710 : saved_command = apply_error_callback_arg.command;
3375 675710 : apply_error_callback_arg.command = action;
3376 :
3377 675710 : switch (action)
3378 : {
3379 886 : case LOGICAL_REP_MSG_BEGIN:
3380 886 : apply_handle_begin(s);
3381 886 : break;
3382 :
3383 834 : case LOGICAL_REP_MSG_COMMIT:
3384 834 : apply_handle_commit(s);
3385 834 : break;
3386 :
3387 373010 : case LOGICAL_REP_MSG_INSERT:
3388 373010 : apply_handle_insert(s);
3389 372968 : break;
3390 :
3391 132314 : case LOGICAL_REP_MSG_UPDATE:
3392 132314 : apply_handle_update(s);
3393 132300 : break;
3394 :
3395 163866 : case LOGICAL_REP_MSG_DELETE:
3396 163866 : apply_handle_delete(s);
3397 163866 : break;
3398 :
3399 38 : case LOGICAL_REP_MSG_TRUNCATE:
3400 38 : apply_handle_truncate(s);
3401 38 : break;
3402 :
3403 860 : case LOGICAL_REP_MSG_RELATION:
3404 860 : apply_handle_relation(s);
3405 860 : break;
3406 :
3407 36 : case LOGICAL_REP_MSG_TYPE:
3408 36 : apply_handle_type(s);
3409 36 : break;
3410 :
3411 16 : case LOGICAL_REP_MSG_ORIGIN:
3412 16 : apply_handle_origin(s);
3413 16 : break;
3414 :
3415 0 : case LOGICAL_REP_MSG_MESSAGE:
3416 :
3417 : /*
3418 : * Logical replication does not use generic logical messages yet.
3419 : * Although, it could be used by other applications that use this
3420 : * output plugin.
3421 : */
3422 0 : break;
3423 :
3424 1760 : case LOGICAL_REP_MSG_STREAM_START:
3425 1760 : apply_handle_stream_start(s);
3426 1760 : break;
3427 :
3428 1758 : case LOGICAL_REP_MSG_STREAM_STOP:
3429 1758 : apply_handle_stream_stop(s);
3430 1754 : break;
3431 :
3432 76 : case LOGICAL_REP_MSG_STREAM_ABORT:
3433 76 : apply_handle_stream_abort(s);
3434 76 : break;
3435 :
3436 122 : case LOGICAL_REP_MSG_STREAM_COMMIT:
3437 122 : apply_handle_stream_commit(s);
3438 118 : break;
3439 :
3440 32 : case LOGICAL_REP_MSG_BEGIN_PREPARE:
3441 32 : apply_handle_begin_prepare(s);
3442 32 : break;
3443 :
3444 30 : case LOGICAL_REP_MSG_PREPARE:
3445 30 : apply_handle_prepare(s);
3446 28 : break;
3447 :
3448 40 : case LOGICAL_REP_MSG_COMMIT_PREPARED:
3449 40 : apply_handle_commit_prepared(s);
3450 40 : break;
3451 :
3452 10 : case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
3453 10 : apply_handle_rollback_prepared(s);
3454 10 : break;
3455 :
3456 22 : case LOGICAL_REP_MSG_STREAM_PREPARE:
3457 22 : apply_handle_stream_prepare(s);
3458 22 : break;
3459 :
3460 0 : default:
3461 0 : ereport(ERROR,
3462 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
3463 : errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3464 : }
3465 :
3466 : /* Reset the current command */
3467 675644 : apply_error_callback_arg.command = saved_command;
3468 675644 : }
3469 :
3470 : /*
3471 : * Figure out which write/flush positions to report to the walsender process.
3472 : *
3473 : * We can't simply report back the last LSN the walsender sent us because the
3474 : * local transaction might not yet be flushed to disk locally. Instead we
3475 : * build a list that associates local with remote LSNs for every commit. When
3476 : * reporting back the flush position to the sender we iterate that list and
3477 : * check which entries on it are already locally flushed. Those we can report
3478 : * as having been flushed.
3479 : *
3480 : * The have_pending_txes is true if there are outstanding transactions that
3481 : * need to be flushed.
3482 : */
3483 : static void
3484 85662 : get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
3485 : bool *have_pending_txes)
3486 : {
3487 : dlist_mutable_iter iter;
3488 85662 : XLogRecPtr local_flush = GetFlushRecPtr(NULL);
3489 :
3490 85662 : *write = InvalidXLogRecPtr;
3491 85662 : *flush = InvalidXLogRecPtr;
3492 :
3493 86596 : dlist_foreach_modify(iter, &lsn_mapping)
3494 : {
3495 37644 : FlushPosition *pos =
3496 37644 : dlist_container(FlushPosition, node, iter.cur);
3497 :
3498 37644 : *write = pos->remote_end;
3499 :
3500 37644 : if (pos->local_end <= local_flush)
3501 : {
3502 934 : *flush = pos->remote_end;
3503 934 : dlist_delete(iter.cur);
3504 934 : pfree(pos);
3505 : }
3506 : else
3507 : {
3508 : /*
3509 : * Don't want to uselessly iterate over the rest of the list which
3510 : * could potentially be long. Instead get the last element and
3511 : * grab the write position from there.
3512 : */
3513 36710 : pos = dlist_tail_element(FlushPosition, node,
3514 : &lsn_mapping);
3515 36710 : *write = pos->remote_end;
3516 36710 : *have_pending_txes = true;
3517 36710 : return;
3518 : }
3519 : }
3520 :
3521 48952 : *have_pending_txes = !dlist_is_empty(&lsn_mapping);
3522 : }
3523 :
3524 : /*
3525 : * Store current remote/local lsn pair in the tracking list.
3526 : */
3527 : void
3528 1046 : store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
3529 : {
3530 : FlushPosition *flushpos;
3531 :
3532 : /*
3533 : * Skip for parallel apply workers, because the lsn_mapping is maintained
3534 : * by the leader apply worker.
3535 : */
3536 1046 : if (am_parallel_apply_worker())
3537 38 : return;
3538 :
3539 : /* Need to do this in permanent context */
3540 1008 : MemoryContextSwitchTo(ApplyContext);
3541 :
3542 : /* Track commit lsn */
3543 1008 : flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
3544 1008 : flushpos->local_end = local_lsn;
3545 1008 : flushpos->remote_end = remote_lsn;
3546 :
3547 1008 : dlist_push_tail(&lsn_mapping, &flushpos->node);
3548 1008 : MemoryContextSwitchTo(ApplyMessageContext);
3549 : }
3550 :
3551 :
3552 : /* Update statistics of the worker. */
3553 : static void
3554 380614 : UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
3555 : {
3556 380614 : MyLogicalRepWorker->last_lsn = last_lsn;
3557 380614 : MyLogicalRepWorker->last_send_time = send_time;
3558 380614 : MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
3559 380614 : if (reply)
3560 : {
3561 10994 : MyLogicalRepWorker->reply_lsn = last_lsn;
3562 10994 : MyLogicalRepWorker->reply_time = send_time;
3563 : }
3564 380614 : }
3565 :
3566 : /*
3567 : * Apply main loop.
3568 : */
3569 : static void
3570 706 : LogicalRepApplyLoop(XLogRecPtr last_received)
3571 : {
3572 706 : TimestampTz last_recv_timestamp = GetCurrentTimestamp();
3573 706 : bool ping_sent = false;
3574 : TimeLineID tli;
3575 : ErrorContextCallback errcallback;
3576 :
3577 : /*
3578 : * Init the ApplyMessageContext which we clean up after each replication
3579 : * protocol message.
3580 : */
3581 706 : ApplyMessageContext = AllocSetContextCreate(ApplyContext,
3582 : "ApplyMessageContext",
3583 : ALLOCSET_DEFAULT_SIZES);
3584 :
3585 : /*
3586 : * This memory context is used for per-stream data when the streaming mode
3587 : * is enabled. This context is reset on each stream stop.
3588 : */
3589 706 : LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
3590 : "LogicalStreamingContext",
3591 : ALLOCSET_DEFAULT_SIZES);
3592 :
3593 : /* mark as idle, before starting to loop */
3594 706 : pgstat_report_activity(STATE_IDLE, NULL);
3595 :
3596 : /*
3597 : * Push apply error context callback. Fields will be filled while applying
3598 : * a change.
3599 : */
3600 706 : errcallback.callback = apply_error_callback;
3601 706 : errcallback.previous = error_context_stack;
3602 706 : error_context_stack = &errcallback;
3603 706 : apply_error_context_stack = error_context_stack;
3604 :
3605 : /* This outer loop iterates once per wait. */
3606 : for (;;)
3607 73784 : {
3608 74490 : pgsocket fd = PGINVALID_SOCKET;
3609 : int rc;
3610 : int len;
3611 74490 : char *buf = NULL;
3612 74490 : bool endofstream = false;
3613 : long wait_time;
3614 :
3615 74490 : CHECK_FOR_INTERRUPTS();
3616 :
3617 74490 : MemoryContextSwitchTo(ApplyMessageContext);
3618 :
3619 74490 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
3620 :
3621 74454 : if (len != 0)
3622 : {
3623 : /* Loop to process all available data (without blocking). */
3624 : for (;;)
3625 : {
3626 453308 : CHECK_FOR_INTERRUPTS();
3627 :
3628 453308 : if (len == 0)
3629 : {
3630 72680 : break;
3631 : }
3632 380628 : else if (len < 0)
3633 : {
3634 14 : ereport(LOG,
3635 : (errmsg("data stream from publisher has ended")));
3636 14 : endofstream = true;
3637 14 : break;
3638 : }
3639 : else
3640 : {
3641 : int c;
3642 : StringInfoData s;
3643 :
3644 380614 : if (ConfigReloadPending)
3645 : {
3646 0 : ConfigReloadPending = false;
3647 0 : ProcessConfigFile(PGC_SIGHUP);
3648 : }
3649 :
3650 : /* Reset timeout. */
3651 380614 : last_recv_timestamp = GetCurrentTimestamp();
3652 380614 : ping_sent = false;
3653 :
3654 : /* Ensure we are reading the data into our memory context. */
3655 380614 : MemoryContextSwitchTo(ApplyMessageContext);
3656 :
3657 380614 : initReadOnlyStringInfo(&s, buf, len);
3658 :
3659 380614 : c = pq_getmsgbyte(&s);
3660 :
3661 380614 : if (c == 'w')
3662 : {
3663 : XLogRecPtr start_lsn;
3664 : XLogRecPtr end_lsn;
3665 : TimestampTz send_time;
3666 :
3667 369620 : start_lsn = pq_getmsgint64(&s);
3668 369620 : end_lsn = pq_getmsgint64(&s);
3669 369620 : send_time = pq_getmsgint64(&s);
3670 :
3671 369620 : if (last_received < start_lsn)
3672 330240 : last_received = start_lsn;
3673 :
3674 369620 : if (last_received < end_lsn)
3675 0 : last_received = end_lsn;
3676 :
3677 369620 : UpdateWorkerStats(last_received, send_time, false);
3678 :
3679 369620 : apply_dispatch(&s);
3680 : }
3681 10994 : else if (c == 'k')
3682 : {
3683 : XLogRecPtr end_lsn;
3684 : TimestampTz timestamp;
3685 : bool reply_requested;
3686 :
3687 10994 : end_lsn = pq_getmsgint64(&s);
3688 10994 : timestamp = pq_getmsgint64(&s);
3689 10994 : reply_requested = pq_getmsgbyte(&s);
3690 :
3691 10994 : if (last_received < end_lsn)
3692 1524 : last_received = end_lsn;
3693 :
3694 10994 : send_feedback(last_received, reply_requested, false);
3695 10994 : UpdateWorkerStats(last_received, timestamp, true);
3696 : }
3697 : /* other message types are purposefully ignored */
3698 :
3699 380554 : MemoryContextReset(ApplyMessageContext);
3700 : }
3701 :
3702 380554 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
3703 : }
3704 : }
3705 :
3706 : /* confirm all writes so far */
3707 74394 : send_feedback(last_received, false, false);
3708 :
3709 74394 : if (!in_remote_transaction && !in_streamed_transaction)
3710 : {
3711 : /*
3712 : * If we didn't get any transactions for a while there might be
3713 : * unconsumed invalidation messages in the queue, consume them
3714 : * now.
3715 : */
3716 13078 : AcceptInvalidationMessages();
3717 13078 : maybe_reread_subscription();
3718 :
3719 : /* Process any table synchronization changes. */
3720 13012 : process_syncing_tables(last_received);
3721 : }
3722 :
3723 : /* Cleanup the memory. */
3724 73966 : MemoryContextReset(ApplyMessageContext);
3725 73966 : MemoryContextSwitchTo(TopMemoryContext);
3726 :
3727 : /* Check if we need to exit the streaming loop. */
3728 73966 : if (endofstream)
3729 14 : break;
3730 :
3731 : /*
3732 : * Wait for more data or latch. If we have unflushed transactions,
3733 : * wake up after WalWriterDelay to see if they've been flushed yet (in
3734 : * which case we should send a feedback message). Otherwise, there's
3735 : * no particular urgency about waking up unless we get data or a
3736 : * signal.
3737 : */
3738 73952 : if (!dlist_is_empty(&lsn_mapping))
3739 27618 : wait_time = WalWriterDelay;
3740 : else
3741 46334 : wait_time = NAPTIME_PER_CYCLE;
3742 :
3743 73952 : rc = WaitLatchOrSocket(MyLatch,
3744 : WL_SOCKET_READABLE | WL_LATCH_SET |
3745 : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
3746 : fd, wait_time,
3747 : WAIT_EVENT_LOGICAL_APPLY_MAIN);
3748 :
3749 73952 : if (rc & WL_LATCH_SET)
3750 : {
3751 960 : ResetLatch(MyLatch);
3752 960 : CHECK_FOR_INTERRUPTS();
3753 : }
3754 :
3755 73784 : if (ConfigReloadPending)
3756 : {
3757 16 : ConfigReloadPending = false;
3758 16 : ProcessConfigFile(PGC_SIGHUP);
3759 : }
3760 :
3761 73784 : if (rc & WL_TIMEOUT)
3762 : {
3763 : /*
3764 : * We didn't receive anything new. If we haven't heard anything
3765 : * from the server for more than wal_receiver_timeout / 2, ping
3766 : * the server. Also, if it's been longer than
3767 : * wal_receiver_status_interval since the last update we sent,
3768 : * send a status update to the primary anyway, to report any
3769 : * progress in applying WAL.
3770 : */
3771 274 : bool requestReply = false;
3772 :
3773 : /*
3774 : * Check if time since last receive from primary has reached the
3775 : * configured limit.
3776 : */
3777 274 : if (wal_receiver_timeout > 0)
3778 : {
3779 274 : TimestampTz now = GetCurrentTimestamp();
3780 : TimestampTz timeout;
3781 :
3782 274 : timeout =
3783 274 : TimestampTzPlusMilliseconds(last_recv_timestamp,
3784 : wal_receiver_timeout);
3785 :
3786 274 : if (now >= timeout)
3787 0 : ereport(ERROR,
3788 : (errcode(ERRCODE_CONNECTION_FAILURE),
3789 : errmsg("terminating logical replication worker due to timeout")));
3790 :
3791 : /* Check to see if it's time for a ping. */
3792 274 : if (!ping_sent)
3793 : {
3794 274 : timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
3795 : (wal_receiver_timeout / 2));
3796 274 : if (now >= timeout)
3797 : {
3798 0 : requestReply = true;
3799 0 : ping_sent = true;
3800 : }
3801 : }
3802 : }
3803 :
3804 274 : send_feedback(last_received, requestReply, requestReply);
3805 :
3806 : /*
3807 : * Force reporting to ensure long idle periods don't lead to
3808 : * arbitrarily delayed stats. Stats can only be reported outside
3809 : * of (implicit or explicit) transactions. That shouldn't lead to
3810 : * stats being delayed for long, because transactions are either
3811 : * sent as a whole on commit or streamed. Streamed transactions
3812 : * are spilled to disk and applied on commit.
3813 : */
3814 274 : if (!IsTransactionState())
3815 274 : pgstat_report_stat(true);
3816 : }
3817 : }
3818 :
3819 : /* Pop the error context stack */
3820 14 : error_context_stack = errcallback.previous;
3821 14 : apply_error_context_stack = error_context_stack;
3822 :
3823 : /* All done */
3824 14 : walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
3825 0 : }
3826 :
3827 : /*
3828 : * Send a Standby Status Update message to server.
3829 : *
3830 : * 'recvpos' is the latest LSN we've received data to, force is set if we need
3831 : * to send a response to avoid timeouts.
3832 : */
3833 : static void
3834 85662 : send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
3835 : {
3836 : static StringInfo reply_message = NULL;
3837 : static TimestampTz send_time = 0;
3838 :
3839 : static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
3840 : static XLogRecPtr last_writepos = InvalidXLogRecPtr;
3841 : static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
3842 :
3843 : XLogRecPtr writepos;
3844 : XLogRecPtr flushpos;
3845 : TimestampTz now;
3846 : bool have_pending_txes;
3847 :
3848 : /*
3849 : * If the user doesn't want status to be reported to the publisher, be
3850 : * sure to exit before doing anything at all.
3851 : */
3852 85662 : if (!force && wal_receiver_status_interval <= 0)
3853 34736 : return;
3854 :
3855 : /* It's legal to not pass a recvpos */
3856 85662 : if (recvpos < last_recvpos)
3857 0 : recvpos = last_recvpos;
3858 :
3859 85662 : get_flush_position(&writepos, &flushpos, &have_pending_txes);
3860 :
3861 : /*
3862 : * No outstanding transactions to flush, we can report the latest received
3863 : * position. This is important for synchronous replication.
3864 : */
3865 85662 : if (!have_pending_txes)
3866 48952 : flushpos = writepos = recvpos;
3867 :
3868 85662 : if (writepos < last_writepos)
3869 0 : writepos = last_writepos;
3870 :
3871 85662 : if (flushpos < last_flushpos)
3872 36624 : flushpos = last_flushpos;
3873 :
3874 85662 : now = GetCurrentTimestamp();
3875 :
3876 : /* if we've already reported everything we're good */
3877 85662 : if (!force &&
3878 77658 : writepos == last_writepos &&
3879 35210 : flushpos == last_flushpos &&
3880 34966 : !TimestampDifferenceExceeds(send_time, now,
3881 : wal_receiver_status_interval * 1000))
3882 34736 : return;
3883 50926 : send_time = now;
3884 :
3885 50926 : if (!reply_message)
3886 : {
3887 706 : MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
3888 :
3889 706 : reply_message = makeStringInfo();
3890 706 : MemoryContextSwitchTo(oldctx);
3891 : }
3892 : else
3893 50220 : resetStringInfo(reply_message);
3894 :
3895 50926 : pq_sendbyte(reply_message, 'r');
3896 50926 : pq_sendint64(reply_message, recvpos); /* write */
3897 50926 : pq_sendint64(reply_message, flushpos); /* flush */
3898 50926 : pq_sendint64(reply_message, writepos); /* apply */
3899 50926 : pq_sendint64(reply_message, now); /* sendTime */
3900 50926 : pq_sendbyte(reply_message, requestReply); /* replyRequested */
3901 :
3902 50926 : elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
3903 : force,
3904 : LSN_FORMAT_ARGS(recvpos),
3905 : LSN_FORMAT_ARGS(writepos),
3906 : LSN_FORMAT_ARGS(flushpos));
3907 :
3908 50926 : walrcv_send(LogRepWorkerWalRcvConn,
3909 : reply_message->data, reply_message->len);
3910 :
3911 50926 : if (recvpos > last_recvpos)
3912 42452 : last_recvpos = recvpos;
3913 50926 : if (writepos > last_writepos)
3914 42454 : last_writepos = writepos;
3915 50926 : if (flushpos > last_flushpos)
3916 41962 : last_flushpos = flushpos;
3917 : }
3918 :
3919 : /*
3920 : * Exit routine for apply workers due to subscription parameter changes.
3921 : */
3922 : static void
3923 70 : apply_worker_exit(void)
3924 : {
3925 70 : if (am_parallel_apply_worker())
3926 : {
3927 : /*
3928 : * Don't stop the parallel apply worker as the leader will detect the
3929 : * subscription parameter change and restart logical replication later
3930 : * anyway. This also prevents the leader from reporting errors when
3931 : * trying to communicate with a stopped parallel apply worker, which
3932 : * would accidentally disable subscriptions if disable_on_error was
3933 : * set.
3934 : */
3935 0 : return;
3936 : }
3937 :
3938 : /*
3939 : * Reset the last-start time for this apply worker so that the launcher
3940 : * will restart it without waiting for wal_retrieve_retry_interval if the
3941 : * subscription is still active, and so that we won't leak that hash table
3942 : * entry if it isn't.
3943 : */
3944 70 : if (am_leader_apply_worker())
3945 70 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
3946 :
3947 70 : proc_exit(0);
3948 : }
3949 :
3950 : /*
3951 : * Reread subscription info if needed. Most changes will be exit.
3952 : */
3953 : void
3954 15070 : maybe_reread_subscription(void)
3955 : {
3956 : MemoryContext oldctx;
3957 : Subscription *newsub;
3958 15070 : bool started_tx = false;
3959 :
3960 : /* When cache state is valid there is nothing to do here. */
3961 15070 : if (MySubscriptionValid)
3962 14928 : return;
3963 :
3964 : /* This function might be called inside or outside of transaction. */
3965 142 : if (!IsTransactionState())
3966 : {
3967 132 : StartTransactionCommand();
3968 132 : started_tx = true;
3969 : }
3970 :
3971 : /* Ensure allocations in permanent context. */
3972 142 : oldctx = MemoryContextSwitchTo(ApplyContext);
3973 :
3974 142 : newsub = GetSubscription(MyLogicalRepWorker->subid, true);
3975 :
3976 : /*
3977 : * Exit if the subscription was removed. This normally should not happen
3978 : * as the worker gets killed during DROP SUBSCRIPTION.
3979 : */
3980 142 : if (!newsub)
3981 : {
3982 0 : ereport(LOG,
3983 : (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
3984 : MySubscription->name)));
3985 :
3986 : /* Ensure we remove no-longer-useful entry for worker's start time */
3987 0 : if (am_leader_apply_worker())
3988 0 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
3989 :
3990 0 : proc_exit(0);
3991 : }
3992 :
3993 : /* Exit if the subscription was disabled. */
3994 142 : if (!newsub->enabled)
3995 : {
3996 16 : ereport(LOG,
3997 : (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
3998 : MySubscription->name)));
3999 :
4000 16 : apply_worker_exit();
4001 : }
4002 :
4003 : /* !slotname should never happen when enabled is true. */
4004 : Assert(newsub->slotname);
4005 :
4006 : /* two-phase cannot be altered while the worker is running */
4007 : Assert(newsub->twophasestate == MySubscription->twophasestate);
4008 :
4009 : /*
4010 : * Exit if any parameter that affects the remote connection was changed.
4011 : * The launcher will start a new worker but note that the parallel apply
4012 : * worker won't restart if the streaming option's value is changed from
4013 : * 'parallel' to any other value or the server decides not to stream the
4014 : * in-progress transaction.
4015 : */
4016 126 : if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
4017 122 : strcmp(newsub->name, MySubscription->name) != 0 ||
4018 120 : strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
4019 120 : newsub->binary != MySubscription->binary ||
4020 108 : newsub->stream != MySubscription->stream ||
4021 98 : newsub->passwordrequired != MySubscription->passwordrequired ||
4022 98 : strcmp(newsub->origin, MySubscription->origin) != 0 ||
4023 98 : newsub->owner != MySubscription->owner ||
4024 96 : !equal(newsub->publications, MySubscription->publications))
4025 : {
4026 46 : if (am_parallel_apply_worker())
4027 0 : ereport(LOG,
4028 : (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
4029 : MySubscription->name)));
4030 : else
4031 46 : ereport(LOG,
4032 : (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
4033 : MySubscription->name)));
4034 :
4035 46 : apply_worker_exit();
4036 : }
4037 :
4038 : /*
4039 : * Exit if the subscription owner's superuser privileges have been
4040 : * revoked.
4041 : */
4042 80 : if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
4043 : {
4044 8 : if (am_parallel_apply_worker())
4045 0 : ereport(LOG,
4046 : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
4047 : MySubscription->name));
4048 : else
4049 8 : ereport(LOG,
4050 : errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
4051 : MySubscription->name));
4052 :
4053 8 : apply_worker_exit();
4054 : }
4055 :
4056 : /* Check for other changes that should never happen too. */
4057 72 : if (newsub->dbid != MySubscription->dbid)
4058 : {
4059 0 : elog(ERROR, "subscription %u changed unexpectedly",
4060 : MyLogicalRepWorker->subid);
4061 : }
4062 :
4063 : /* Clean old subscription info and switch to new one. */
4064 72 : FreeSubscription(MySubscription);
4065 72 : MySubscription = newsub;
4066 :
4067 72 : MemoryContextSwitchTo(oldctx);
4068 :
4069 : /* Change synchronous commit according to the user's wishes */
4070 72 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
4071 : PGC_BACKEND, PGC_S_OVERRIDE);
4072 :
4073 72 : if (started_tx)
4074 66 : CommitTransactionCommand();
4075 :
4076 72 : MySubscriptionValid = true;
4077 : }
4078 :
4079 : /*
4080 : * Callback from subscription syscache invalidation.
4081 : */
4082 : static void
4083 148 : subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
4084 : {
4085 148 : MySubscriptionValid = false;
4086 148 : }
4087 :
4088 : /*
4089 : * subxact_info_write
4090 : * Store information about subxacts for a toplevel transaction.
4091 : *
4092 : * For each subxact we store offset of it's first change in the main file.
4093 : * The file is always over-written as a whole.
4094 : *
4095 : * XXX We should only store subxacts that were not aborted yet.
4096 : */
4097 : static void
4098 792 : subxact_info_write(Oid subid, TransactionId xid)
4099 : {
4100 : char path[MAXPGPATH];
4101 : Size len;
4102 : BufFile *fd;
4103 :
4104 : Assert(TransactionIdIsValid(xid));
4105 :
4106 : /* construct the subxact filename */
4107 792 : subxact_filename(path, subid, xid);
4108 :
4109 : /* Delete the subxacts file, if exists. */
4110 792 : if (subxact_data.nsubxacts == 0)
4111 : {
4112 628 : cleanup_subxact_info();
4113 628 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
4114 :
4115 628 : return;
4116 : }
4117 :
4118 : /*
4119 : * Create the subxact file if it not already created, otherwise open the
4120 : * existing file.
4121 : */
4122 164 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR,
4123 : true);
4124 164 : if (fd == NULL)
4125 16 : fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path);
4126 :
4127 164 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
4128 :
4129 : /* Write the subxact count and subxact info */
4130 164 : BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
4131 164 : BufFileWrite(fd, subxact_data.subxacts, len);
4132 :
4133 164 : BufFileClose(fd);
4134 :
4135 : /* free the memory allocated for subxact info */
4136 164 : cleanup_subxact_info();
4137 : }
4138 :
4139 : /*
4140 : * subxact_info_read
4141 : * Restore information about subxacts of a streamed transaction.
4142 : *
4143 : * Read information about subxacts into the structure subxact_data that can be
4144 : * used later.
4145 : */
4146 : static void
4147 736 : subxact_info_read(Oid subid, TransactionId xid)
4148 : {
4149 : char path[MAXPGPATH];
4150 : Size len;
4151 : BufFile *fd;
4152 : MemoryContext oldctx;
4153 :
4154 : Assert(!subxact_data.subxacts);
4155 : Assert(subxact_data.nsubxacts == 0);
4156 : Assert(subxact_data.nsubxacts_max == 0);
4157 :
4158 : /*
4159 : * If the subxact file doesn't exist that means we don't have any subxact
4160 : * info.
4161 : */
4162 736 : subxact_filename(path, subid, xid);
4163 736 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
4164 : true);
4165 736 : if (fd == NULL)
4166 578 : return;
4167 :
4168 : /* read number of subxact items */
4169 158 : BufFileReadExact(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
4170 :
4171 158 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
4172 :
4173 : /* we keep the maximum as a power of 2 */
4174 158 : subxact_data.nsubxacts_max = 1 << my_log2(subxact_data.nsubxacts);
4175 :
4176 : /*
4177 : * Allocate subxact information in the logical streaming context. We need
4178 : * this information during the complete stream so that we can add the sub
4179 : * transaction info to this. On stream stop we will flush this information
4180 : * to the subxact file and reset the logical streaming context.
4181 : */
4182 158 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
4183 158 : subxact_data.subxacts = palloc(subxact_data.nsubxacts_max *
4184 : sizeof(SubXactInfo));
4185 158 : MemoryContextSwitchTo(oldctx);
4186 :
4187 158 : if (len > 0)
4188 158 : BufFileReadExact(fd, subxact_data.subxacts, len);
4189 :
4190 158 : BufFileClose(fd);
4191 : }
4192 :
4193 : /*
4194 : * subxact_info_add
4195 : * Add information about a subxact (offset in the main file).
4196 : */
4197 : static void
4198 205024 : subxact_info_add(TransactionId xid)
4199 : {
4200 205024 : SubXactInfo *subxacts = subxact_data.subxacts;
4201 : int64 i;
4202 :
4203 : /* We must have a valid top level stream xid and a stream fd. */
4204 : Assert(TransactionIdIsValid(stream_xid));
4205 : Assert(stream_fd != NULL);
4206 :
4207 : /*
4208 : * If the XID matches the toplevel transaction, we don't want to add it.
4209 : */
4210 205024 : if (stream_xid == xid)
4211 184776 : return;
4212 :
4213 : /*
4214 : * In most cases we're checking the same subxact as we've already seen in
4215 : * the last call, so make sure to ignore it (this change comes later).
4216 : */
4217 20248 : if (subxact_data.subxact_last == xid)
4218 20096 : return;
4219 :
4220 : /* OK, remember we're processing this XID. */
4221 152 : subxact_data.subxact_last = xid;
4222 :
4223 : /*
4224 : * Check if the transaction is already present in the array of subxact. We
4225 : * intentionally scan the array from the tail, because we're likely adding
4226 : * a change for the most recent subtransactions.
4227 : *
4228 : * XXX Can we rely on the subxact XIDs arriving in sorted order? That
4229 : * would allow us to use binary search here.
4230 : */
4231 190 : for (i = subxact_data.nsubxacts; i > 0; i--)
4232 : {
4233 : /* found, so we're done */
4234 152 : if (subxacts[i - 1].xid == xid)
4235 114 : return;
4236 : }
4237 :
4238 : /* This is a new subxact, so we need to add it to the array. */
4239 38 : if (subxact_data.nsubxacts == 0)
4240 : {
4241 : MemoryContext oldctx;
4242 :
4243 16 : subxact_data.nsubxacts_max = 128;
4244 :
4245 : /*
4246 : * Allocate this memory for subxacts in per-stream context, see
4247 : * subxact_info_read.
4248 : */
4249 16 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
4250 16 : subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
4251 16 : MemoryContextSwitchTo(oldctx);
4252 : }
4253 22 : else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
4254 : {
4255 20 : subxact_data.nsubxacts_max *= 2;
4256 20 : subxacts = repalloc(subxacts,
4257 20 : subxact_data.nsubxacts_max * sizeof(SubXactInfo));
4258 : }
4259 :
4260 38 : subxacts[subxact_data.nsubxacts].xid = xid;
4261 :
4262 : /*
4263 : * Get the current offset of the stream file and store it as offset of
4264 : * this subxact.
4265 : */
4266 38 : BufFileTell(stream_fd,
4267 38 : &subxacts[subxact_data.nsubxacts].fileno,
4268 38 : &subxacts[subxact_data.nsubxacts].offset);
4269 :
4270 38 : subxact_data.nsubxacts++;
4271 38 : subxact_data.subxacts = subxacts;
4272 : }
4273 :
4274 : /* format filename for file containing the info about subxacts */
4275 : static inline void
4276 1590 : subxact_filename(char *path, Oid subid, TransactionId xid)
4277 : {
4278 1590 : snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
4279 1590 : }
4280 :
4281 : /* format filename for file containing serialized changes */
4282 : static inline void
4283 924 : changes_filename(char *path, Oid subid, TransactionId xid)
4284 : {
4285 924 : snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
4286 924 : }
4287 :
4288 : /*
4289 : * stream_cleanup_files
4290 : * Cleanup files for a subscription / toplevel transaction.
4291 : *
4292 : * Remove files with serialized changes and subxact info for a particular
4293 : * toplevel transaction. Each subscription has a separate set of files
4294 : * for any toplevel transaction.
4295 : */
4296 : void
4297 62 : stream_cleanup_files(Oid subid, TransactionId xid)
4298 : {
4299 : char path[MAXPGPATH];
4300 :
4301 : /* Delete the changes file. */
4302 62 : changes_filename(path, subid, xid);
4303 62 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false);
4304 :
4305 : /* Delete the subxact file, if it exists. */
4306 62 : subxact_filename(path, subid, xid);
4307 62 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
4308 62 : }
4309 :
4310 : /*
4311 : * stream_open_file
4312 : * Open a file that we'll use to serialize changes for a toplevel
4313 : * transaction.
4314 : *
4315 : * Open a file for streamed changes from a toplevel transaction identified
4316 : * by stream_xid (global variable). If it's the first chunk of streamed
4317 : * changes for this transaction, create the buffile, otherwise open the
4318 : * previously created file.
4319 : */
4320 : static void
4321 774 : stream_open_file(Oid subid, TransactionId xid, bool first_segment)
4322 : {
4323 : char path[MAXPGPATH];
4324 : MemoryContext oldcxt;
4325 :
4326 : Assert(OidIsValid(subid));
4327 : Assert(TransactionIdIsValid(xid));
4328 : Assert(stream_fd == NULL);
4329 :
4330 :
4331 774 : changes_filename(path, subid, xid);
4332 774 : elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
4333 :
4334 : /*
4335 : * Create/open the buffiles under the logical streaming context so that we
4336 : * have those files until stream stop.
4337 : */
4338 774 : oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
4339 :
4340 : /*
4341 : * If this is the first streamed segment, create the changes file.
4342 : * Otherwise, just open the file for writing, in append mode.
4343 : */
4344 774 : if (first_segment)
4345 64 : stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset,
4346 : path);
4347 : else
4348 : {
4349 : /*
4350 : * Open the file and seek to the end of the file because we always
4351 : * append the changes file.
4352 : */
4353 710 : stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset,
4354 : path, O_RDWR, false);
4355 710 : BufFileSeek(stream_fd, 0, 0, SEEK_END);
4356 : }
4357 :
4358 774 : MemoryContextSwitchTo(oldcxt);
4359 774 : }
4360 :
4361 : /*
4362 : * stream_close_file
4363 : * Close the currently open file with streamed changes.
4364 : */
4365 : static void
4366 834 : stream_close_file(void)
4367 : {
4368 : Assert(stream_fd != NULL);
4369 :
4370 834 : BufFileClose(stream_fd);
4371 :
4372 834 : stream_fd = NULL;
4373 834 : }
4374 :
4375 : /*
4376 : * stream_write_change
4377 : * Serialize a change to a file for the current toplevel transaction.
4378 : *
4379 : * The change is serialized in a simple format, with length (not including
4380 : * the length), action code (identifying the message type) and message
4381 : * contents (without the subxact TransactionId value).
4382 : */
4383 : static void
4384 215106 : stream_write_change(char action, StringInfo s)
4385 : {
4386 : int len;
4387 :
4388 : Assert(stream_fd != NULL);
4389 :
4390 : /* total on-disk size, including the action type character */
4391 215106 : len = (s->len - s->cursor) + sizeof(char);
4392 :
4393 : /* first write the size */
4394 215106 : BufFileWrite(stream_fd, &len, sizeof(len));
4395 :
4396 : /* then the action */
4397 215106 : BufFileWrite(stream_fd, &action, sizeof(action));
4398 :
4399 : /* and finally the remaining part of the buffer (after the XID) */
4400 215106 : len = (s->len - s->cursor);
4401 :
4402 215106 : BufFileWrite(stream_fd, &s->data[s->cursor], len);
4403 215106 : }
4404 :
4405 : /*
4406 : * stream_open_and_write_change
4407 : * Serialize a message to a file for the given transaction.
4408 : *
4409 : * This function is similar to stream_write_change except that it will open the
4410 : * target file if not already before writing the message and close the file at
4411 : * the end.
4412 : */
4413 : static void
4414 10 : stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
4415 : {
4416 : Assert(!in_streamed_transaction);
4417 :
4418 10 : if (!stream_fd)
4419 10 : stream_start_internal(xid, false);
4420 :
4421 10 : stream_write_change(action, s);
4422 10 : stream_stop_internal(xid);
4423 10 : }
4424 :
4425 : /*
4426 : * Sets streaming options including replication slot name and origin start
4427 : * position. Workers need these options for logical replication.
4428 : */
4429 : void
4430 706 : set_stream_options(WalRcvStreamOptions *options,
4431 : char *slotname,
4432 : XLogRecPtr *origin_startpos)
4433 : {
4434 : int server_version;
4435 :
4436 706 : options->logical = true;
4437 706 : options->startpoint = *origin_startpos;
4438 706 : options->slotname = slotname;
4439 :
4440 706 : server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
4441 706 : options->proto.logical.proto_version =
4442 706 : server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
4443 : server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
4444 : server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
4445 : LOGICALREP_PROTO_VERSION_NUM;
4446 :
4447 706 : options->proto.logical.publication_names = MySubscription->publications;
4448 706 : options->proto.logical.binary = MySubscription->binary;
4449 :
4450 : /*
4451 : * Assign the appropriate option value for streaming option according to
4452 : * the 'streaming' mode and the publisher's ability to support that mode.
4453 : */
4454 706 : if (server_version >= 160000 &&
4455 706 : MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
4456 : {
4457 638 : options->proto.logical.streaming_str = "parallel";
4458 638 : MyLogicalRepWorker->parallel_apply = true;
4459 : }
4460 68 : else if (server_version >= 140000 &&
4461 68 : MySubscription->stream != LOGICALREP_STREAM_OFF)
4462 : {
4463 52 : options->proto.logical.streaming_str = "on";
4464 52 : MyLogicalRepWorker->parallel_apply = false;
4465 : }
4466 : else
4467 : {
4468 16 : options->proto.logical.streaming_str = NULL;
4469 16 : MyLogicalRepWorker->parallel_apply = false;
4470 : }
4471 :
4472 706 : options->proto.logical.twophase = false;
4473 706 : options->proto.logical.origin = pstrdup(MySubscription->origin);
4474 706 : }
4475 :
4476 : /*
4477 : * Cleanup the memory for subxacts and reset the related variables.
4478 : */
4479 : static inline void
4480 800 : cleanup_subxact_info()
4481 : {
4482 800 : if (subxact_data.subxacts)
4483 174 : pfree(subxact_data.subxacts);
4484 :
4485 800 : subxact_data.subxacts = NULL;
4486 800 : subxact_data.subxact_last = InvalidTransactionId;
4487 800 : subxact_data.nsubxacts = 0;
4488 800 : subxact_data.nsubxacts_max = 0;
4489 800 : }
4490 :
4491 : /*
4492 : * Common function to run the apply loop with error handling. Disable the
4493 : * subscription, if necessary.
4494 : *
4495 : * Note that we don't handle FATAL errors which are probably because
4496 : * of system resource error and are not repeatable.
4497 : */
4498 : void
4499 706 : start_apply(XLogRecPtr origin_startpos)
4500 : {
4501 706 : PG_TRY();
4502 : {
4503 706 : LogicalRepApplyLoop(origin_startpos);
4504 : }
4505 106 : PG_CATCH();
4506 : {
4507 106 : if (MySubscription->disableonerr)
4508 6 : DisableSubscriptionAndExit();
4509 : else
4510 : {
4511 : /*
4512 : * Report the worker failed while applying changes. Abort the
4513 : * current transaction so that the stats message is sent in an
4514 : * idle state.
4515 : */
4516 100 : AbortOutOfAnyTransaction();
4517 100 : pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
4518 :
4519 100 : PG_RE_THROW();
4520 : }
4521 : }
4522 0 : PG_END_TRY();
4523 0 : }
4524 :
4525 : /*
4526 : * Runs the leader apply worker.
4527 : *
4528 : * It sets up replication origin, streaming options and then starts streaming.
4529 : */
4530 : static void
4531 404 : run_apply_worker()
4532 : {
4533 : char originname[NAMEDATALEN];
4534 404 : XLogRecPtr origin_startpos = InvalidXLogRecPtr;
4535 404 : char *slotname = NULL;
4536 : WalRcvStreamOptions options;
4537 : RepOriginId originid;
4538 : TimeLineID startpointTLI;
4539 : char *err;
4540 : bool must_use_password;
4541 :
4542 404 : slotname = MySubscription->slotname;
4543 :
4544 : /*
4545 : * This shouldn't happen if the subscription is enabled, but guard against
4546 : * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
4547 : * slot is NULL.)
4548 : */
4549 404 : if (!slotname)
4550 0 : ereport(ERROR,
4551 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
4552 : errmsg("subscription has no replication slot set")));
4553 :
4554 : /* Setup replication origin tracking. */
4555 404 : ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
4556 : originname, sizeof(originname));
4557 404 : StartTransactionCommand();
4558 404 : originid = replorigin_by_name(originname, true);
4559 404 : if (!OidIsValid(originid))
4560 0 : originid = replorigin_create(originname);
4561 404 : replorigin_session_setup(originid, 0);
4562 404 : replorigin_session_origin = originid;
4563 404 : origin_startpos = replorigin_session_get_progress(false);
4564 404 : CommitTransactionCommand();
4565 :
4566 : /* Is the use of a password mandatory? */
4567 772 : must_use_password = MySubscription->passwordrequired &&
4568 368 : !MySubscription->ownersuperuser;
4569 :
4570 404 : LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
4571 : true, must_use_password,
4572 : MySubscription->name, &err);
4573 :
4574 390 : if (LogRepWorkerWalRcvConn == NULL)
4575 32 : ereport(ERROR,
4576 : (errcode(ERRCODE_CONNECTION_FAILURE),
4577 : errmsg("apply worker for subscription \"%s\" could not connect to the publisher: %s",
4578 : MySubscription->name, err)));
4579 :
4580 : /*
4581 : * We don't really use the output identify_system for anything but it does
4582 : * some initializations on the upstream so let's still call it.
4583 : */
4584 358 : (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
4585 :
4586 358 : set_apply_error_context_origin(originname);
4587 :
4588 358 : set_stream_options(&options, slotname, &origin_startpos);
4589 :
4590 : /*
4591 : * Even when the two_phase mode is requested by the user, it remains as
4592 : * the tri-state PENDING until all tablesyncs have reached READY state.
4593 : * Only then, can it become ENABLED.
4594 : *
4595 : * Note: If the subscription has no tables then leave the state as
4596 : * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
4597 : * work.
4598 : */
4599 384 : if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
4600 26 : AllTablesyncsReady())
4601 : {
4602 : /* Start streaming with two_phase enabled */
4603 12 : options.proto.logical.twophase = true;
4604 12 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
4605 :
4606 12 : StartTransactionCommand();
4607 12 : UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
4608 12 : MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
4609 12 : CommitTransactionCommand();
4610 : }
4611 : else
4612 : {
4613 346 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
4614 : }
4615 :
4616 358 : ereport(DEBUG1,
4617 : (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
4618 : MySubscription->name,
4619 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
4620 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
4621 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
4622 : "?")));
4623 :
4624 : /* Run the main loop. */
4625 358 : start_apply(origin_startpos);
4626 0 : }
4627 :
4628 : /*
4629 : * Common initialization for leader apply worker, parallel apply worker and
4630 : * tablesync worker.
4631 : *
4632 : * Initialize the database connection, in-memory subscription and necessary
4633 : * config options.
4634 : */
4635 : void
4636 802 : InitializeLogRepWorker(void)
4637 : {
4638 : MemoryContext oldctx;
4639 :
4640 : /* Run as replica session replication role. */
4641 802 : SetConfigOption("session_replication_role", "replica",
4642 : PGC_SUSET, PGC_S_OVERRIDE);
4643 :
4644 : /* Connect to our database. */
4645 802 : BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
4646 802 : MyLogicalRepWorker->userid,
4647 : 0);
4648 :
4649 : /*
4650 : * Set always-secure search path, so malicious users can't redirect user
4651 : * code (e.g. pg_index.indexprs).
4652 : */
4653 792 : SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
4654 :
4655 : /* Load the subscription into persistent memory context. */
4656 792 : ApplyContext = AllocSetContextCreate(TopMemoryContext,
4657 : "ApplyContext",
4658 : ALLOCSET_DEFAULT_SIZES);
4659 792 : StartTransactionCommand();
4660 792 : oldctx = MemoryContextSwitchTo(ApplyContext);
4661 :
4662 792 : MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
4663 792 : if (!MySubscription)
4664 : {
4665 0 : ereport(LOG,
4666 : (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
4667 : MyLogicalRepWorker->subid)));
4668 :
4669 : /* Ensure we remove no-longer-useful entry for worker's start time */
4670 0 : if (am_leader_apply_worker())
4671 0 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
4672 :
4673 0 : proc_exit(0);
4674 : }
4675 :
4676 792 : MySubscriptionValid = true;
4677 792 : MemoryContextSwitchTo(oldctx);
4678 :
4679 792 : if (!MySubscription->enabled)
4680 : {
4681 0 : ereport(LOG,
4682 : (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
4683 : MySubscription->name)));
4684 :
4685 0 : apply_worker_exit();
4686 : }
4687 :
4688 : /* Setup synchronous commit according to the user's wishes */
4689 792 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
4690 : PGC_BACKEND, PGC_S_OVERRIDE);
4691 :
4692 : /*
4693 : * Keep us informed about subscription or role changes. Note that the
4694 : * role's superuser privilege can be revoked.
4695 : */
4696 792 : CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
4697 : subscription_change_cb,
4698 : (Datum) 0);
4699 :
4700 792 : CacheRegisterSyscacheCallback(AUTHOID,
4701 : subscription_change_cb,
4702 : (Datum) 0);
4703 :
4704 792 : if (am_tablesync_worker())
4705 368 : ereport(LOG,
4706 : (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
4707 : MySubscription->name,
4708 : get_rel_name(MyLogicalRepWorker->relid))));
4709 : else
4710 424 : ereport(LOG,
4711 : (errmsg("logical replication apply worker for subscription \"%s\" has started",
4712 : MySubscription->name)));
4713 :
4714 792 : CommitTransactionCommand();
4715 792 : }
4716 :
4717 : /*
4718 : * Reset the origin state.
4719 : */
4720 : static void
4721 832 : replorigin_reset(int code, Datum arg)
4722 : {
4723 832 : replorigin_session_origin = InvalidRepOriginId;
4724 832 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
4725 832 : replorigin_session_origin_timestamp = 0;
4726 832 : }
4727 :
4728 : /* Common function to setup the leader apply or tablesync worker. */
4729 : void
4730 782 : SetupApplyOrSyncWorker(int worker_slot)
4731 : {
4732 : /* Attach to slot */
4733 782 : logicalrep_worker_attach(worker_slot);
4734 :
4735 : Assert(am_tablesync_worker() || am_leader_apply_worker());
4736 :
4737 : /* Setup signal handling */
4738 782 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
4739 782 : pqsignal(SIGTERM, die);
4740 782 : BackgroundWorkerUnblockSignals();
4741 :
4742 : /*
4743 : * We don't currently need any ResourceOwner in a walreceiver process, but
4744 : * if we did, we could call CreateAuxProcessResourceOwner here.
4745 : */
4746 :
4747 : /* Initialise stats to a sanish value */
4748 782 : MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
4749 782 : MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
4750 :
4751 : /* Load the libpq-specific functions */
4752 782 : load_file("libpqwalreceiver", false);
4753 :
4754 782 : InitializeLogRepWorker();
4755 :
4756 : /*
4757 : * Register a callback to reset the origin state before aborting any
4758 : * pending transaction during shutdown (see ShutdownPostgres()). This will
4759 : * avoid origin advancement for an in-complete transaction which could
4760 : * otherwise lead to its loss as such a transaction won't be sent by the
4761 : * server again.
4762 : *
4763 : * Note that even a LOG or DEBUG statement placed after setting the origin
4764 : * state may process a shutdown signal before committing the current apply
4765 : * operation. So, it is important to register such a callback here.
4766 : */
4767 772 : before_shmem_exit(replorigin_reset, (Datum) 0);
4768 :
4769 : /* Connect to the origin and start the replication. */
4770 772 : elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
4771 : MySubscription->conninfo);
4772 :
4773 : /*
4774 : * Setup callback for syscache so that we know when something changes in
4775 : * the subscription relation state.
4776 : */
4777 772 : CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
4778 : invalidate_syncing_table_states,
4779 : (Datum) 0);
4780 772 : }
4781 :
4782 : /* Logical Replication Apply worker entry point */
4783 : void
4784 412 : ApplyWorkerMain(Datum main_arg)
4785 : {
4786 412 : int worker_slot = DatumGetInt32(main_arg);
4787 :
4788 412 : InitializingApplyWorker = true;
4789 :
4790 412 : SetupApplyOrSyncWorker(worker_slot);
4791 :
4792 404 : InitializingApplyWorker = false;
4793 :
4794 404 : run_apply_worker();
4795 :
4796 0 : proc_exit(0);
4797 : }
4798 :
4799 : /*
4800 : * After error recovery, disable the subscription in a new transaction
4801 : * and exit cleanly.
4802 : */
4803 : void
4804 8 : DisableSubscriptionAndExit(void)
4805 : {
4806 : /*
4807 : * Emit the error message, and recover from the error state to an idle
4808 : * state
4809 : */
4810 8 : HOLD_INTERRUPTS();
4811 :
4812 8 : EmitErrorReport();
4813 8 : AbortOutOfAnyTransaction();
4814 8 : FlushErrorState();
4815 :
4816 8 : RESUME_INTERRUPTS();
4817 :
4818 : /* Report the worker failed during either table synchronization or apply */
4819 8 : pgstat_report_subscription_error(MyLogicalRepWorker->subid,
4820 8 : !am_tablesync_worker());
4821 :
4822 : /* Disable the subscription */
4823 8 : StartTransactionCommand();
4824 8 : DisableSubscription(MySubscription->oid);
4825 8 : CommitTransactionCommand();
4826 :
4827 : /* Ensure we remove no-longer-useful entry for worker's start time */
4828 8 : if (am_leader_apply_worker())
4829 6 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
4830 :
4831 : /* Notify the subscription has been disabled and exit */
4832 8 : ereport(LOG,
4833 : errmsg("subscription \"%s\" has been disabled because of an error",
4834 : MySubscription->name));
4835 :
4836 8 : proc_exit(0);
4837 : }
4838 :
4839 : /*
4840 : * Is current process a logical replication worker?
4841 : */
4842 : bool
4843 3702 : IsLogicalWorker(void)
4844 : {
4845 3702 : return MyLogicalRepWorker != NULL;
4846 : }
4847 :
4848 : /*
4849 : * Is current process a logical replication parallel apply worker?
4850 : */
4851 : bool
4852 2734 : IsLogicalParallelApplyWorker(void)
4853 : {
4854 2734 : return IsLogicalWorker() && am_parallel_apply_worker();
4855 : }
4856 :
4857 : /*
4858 : * Start skipping changes of the transaction if the given LSN matches the
4859 : * LSN specified by subscription's skiplsn.
4860 : */
4861 : static void
4862 972 : maybe_start_skipping_changes(XLogRecPtr finish_lsn)
4863 : {
4864 : Assert(!is_skipping_changes());
4865 : Assert(!in_remote_transaction);
4866 : Assert(!in_streamed_transaction);
4867 :
4868 : /*
4869 : * Quick return if it's not requested to skip this transaction. This
4870 : * function is called for every remote transaction and we assume that
4871 : * skipping the transaction is not used often.
4872 : */
4873 972 : if (likely(XLogRecPtrIsInvalid(MySubscription->skiplsn) ||
4874 : MySubscription->skiplsn != finish_lsn))
4875 966 : return;
4876 :
4877 : /* Start skipping all changes of this transaction */
4878 6 : skip_xact_finish_lsn = finish_lsn;
4879 :
4880 6 : ereport(LOG,
4881 : errmsg("logical replication starts skipping transaction at LSN %X/%X",
4882 : LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
4883 : }
4884 :
4885 : /*
4886 : * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
4887 : */
4888 : static void
4889 54 : stop_skipping_changes(void)
4890 : {
4891 54 : if (!is_skipping_changes())
4892 48 : return;
4893 :
4894 6 : ereport(LOG,
4895 : (errmsg("logical replication completed skipping transaction at LSN %X/%X",
4896 : LSN_FORMAT_ARGS(skip_xact_finish_lsn))));
4897 :
4898 : /* Stop skipping changes */
4899 6 : skip_xact_finish_lsn = InvalidXLogRecPtr;
4900 : }
4901 :
4902 : /*
4903 : * Clear subskiplsn of pg_subscription catalog.
4904 : *
4905 : * finish_lsn is the transaction's finish LSN that is used to check if the
4906 : * subskiplsn matches it. If not matched, we raise a warning when clearing the
4907 : * subskiplsn in order to inform users for cases e.g., where the user mistakenly
4908 : * specified the wrong subskiplsn.
4909 : */
4910 : static void
4911 1014 : clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
4912 : {
4913 : Relation rel;
4914 : Form_pg_subscription subform;
4915 : HeapTuple tup;
4916 1014 : XLogRecPtr myskiplsn = MySubscription->skiplsn;
4917 1014 : bool started_tx = false;
4918 :
4919 1014 : if (likely(XLogRecPtrIsInvalid(myskiplsn)) || am_parallel_apply_worker())
4920 1008 : return;
4921 :
4922 6 : if (!IsTransactionState())
4923 : {
4924 2 : StartTransactionCommand();
4925 2 : started_tx = true;
4926 : }
4927 :
4928 : /*
4929 : * Protect subskiplsn of pg_subscription from being concurrently updated
4930 : * while clearing it.
4931 : */
4932 6 : LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
4933 : AccessShareLock);
4934 :
4935 6 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
4936 :
4937 : /* Fetch the existing tuple. */
4938 6 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
4939 : ObjectIdGetDatum(MySubscription->oid));
4940 :
4941 6 : if (!HeapTupleIsValid(tup))
4942 0 : elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
4943 :
4944 6 : subform = (Form_pg_subscription) GETSTRUCT(tup);
4945 :
4946 : /*
4947 : * Clear the subskiplsn. If the user has already changed subskiplsn before
4948 : * clearing it we don't update the catalog and the replication origin
4949 : * state won't get advanced. So in the worst case, if the server crashes
4950 : * before sending an acknowledgment of the flush position the transaction
4951 : * will be sent again and the user needs to set subskiplsn again. We can
4952 : * reduce the possibility by logging a replication origin WAL record to
4953 : * advance the origin LSN instead but there is no way to advance the
4954 : * origin timestamp and it doesn't seem to be worth doing anything about
4955 : * it since it's a very rare case.
4956 : */
4957 6 : if (subform->subskiplsn == myskiplsn)
4958 : {
4959 : bool nulls[Natts_pg_subscription];
4960 : bool replaces[Natts_pg_subscription];
4961 : Datum values[Natts_pg_subscription];
4962 :
4963 6 : memset(values, 0, sizeof(values));
4964 6 : memset(nulls, false, sizeof(nulls));
4965 6 : memset(replaces, false, sizeof(replaces));
4966 :
4967 : /* reset subskiplsn */
4968 6 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
4969 6 : replaces[Anum_pg_subscription_subskiplsn - 1] = true;
4970 :
4971 6 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
4972 : replaces);
4973 6 : CatalogTupleUpdate(rel, &tup->t_self, tup);
4974 :
4975 6 : if (myskiplsn != finish_lsn)
4976 0 : ereport(WARNING,
4977 : errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
4978 : errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X.",
4979 : LSN_FORMAT_ARGS(finish_lsn),
4980 : LSN_FORMAT_ARGS(myskiplsn)));
4981 : }
4982 :
4983 6 : heap_freetuple(tup);
4984 6 : table_close(rel, NoLock);
4985 :
4986 6 : if (started_tx)
4987 2 : CommitTransactionCommand();
4988 : }
4989 :
4990 : /* Error callback to give more context info about the change being applied */
4991 : void
4992 1382 : apply_error_callback(void *arg)
4993 : {
4994 1382 : ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
4995 : int elevel;
4996 :
4997 1382 : if (apply_error_callback_arg.command == 0)
4998 662 : return;
4999 :
5000 : Assert(errarg->origin_name);
5001 :
5002 720 : elevel = geterrlevel();
5003 :
5004 : /*
5005 : * Reset the origin state to prevent the advancement of origin progress if
5006 : * we fail to apply. Otherwise, this will result in transaction loss as
5007 : * that transaction won't be sent again by the server.
5008 : */
5009 720 : if (elevel >= ERROR)
5010 60 : replorigin_reset(0, (Datum) 0);
5011 :
5012 720 : if (errarg->rel == NULL)
5013 : {
5014 644 : if (!TransactionIdIsValid(errarg->remote_xid))
5015 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
5016 : errarg->origin_name,
5017 : logicalrep_message_type(errarg->command));
5018 644 : else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
5019 534 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
5020 : errarg->origin_name,
5021 : logicalrep_message_type(errarg->command),
5022 : errarg->remote_xid);
5023 : else
5024 220 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X",
5025 : errarg->origin_name,
5026 : logicalrep_message_type(errarg->command),
5027 : errarg->remote_xid,
5028 110 : LSN_FORMAT_ARGS(errarg->finish_lsn));
5029 : }
5030 : else
5031 : {
5032 76 : if (errarg->remote_attnum < 0)
5033 : {
5034 76 : if (XLogRecPtrIsInvalid(errarg->finish_lsn))
5035 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
5036 : errarg->origin_name,
5037 : logicalrep_message_type(errarg->command),
5038 0 : errarg->rel->remoterel.nspname,
5039 0 : errarg->rel->remoterel.relname,
5040 : errarg->remote_xid);
5041 : else
5042 152 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
5043 : errarg->origin_name,
5044 : logicalrep_message_type(errarg->command),
5045 76 : errarg->rel->remoterel.nspname,
5046 76 : errarg->rel->remoterel.relname,
5047 : errarg->remote_xid,
5048 76 : LSN_FORMAT_ARGS(errarg->finish_lsn));
5049 : }
5050 : else
5051 : {
5052 0 : if (XLogRecPtrIsInvalid(errarg->finish_lsn))
5053 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
5054 : errarg->origin_name,
5055 : logicalrep_message_type(errarg->command),
5056 0 : errarg->rel->remoterel.nspname,
5057 0 : errarg->rel->remoterel.relname,
5058 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
5059 : errarg->remote_xid);
5060 : else
5061 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
5062 : errarg->origin_name,
5063 : logicalrep_message_type(errarg->command),
5064 0 : errarg->rel->remoterel.nspname,
5065 0 : errarg->rel->remoterel.relname,
5066 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
5067 : errarg->remote_xid,
5068 0 : LSN_FORMAT_ARGS(errarg->finish_lsn));
5069 : }
5070 : }
5071 : }
5072 :
5073 : /* Set transaction information of apply error callback */
5074 : static inline void
5075 5830 : set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
5076 : {
5077 5830 : apply_error_callback_arg.remote_xid = xid;
5078 5830 : apply_error_callback_arg.finish_lsn = lsn;
5079 5830 : }
5080 :
5081 : /* Reset all information of apply error callback */
5082 : static inline void
5083 2882 : reset_apply_error_context_info(void)
5084 : {
5085 2882 : apply_error_callback_arg.command = 0;
5086 2882 : apply_error_callback_arg.rel = NULL;
5087 2882 : apply_error_callback_arg.remote_attnum = -1;
5088 2882 : set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
5089 2882 : }
5090 :
5091 : /*
5092 : * Request wakeup of the workers for the given subscription OID
5093 : * at commit of the current transaction.
5094 : *
5095 : * This is used to ensure that the workers process assorted changes
5096 : * as soon as possible.
5097 : */
5098 : void
5099 372 : LogicalRepWorkersWakeupAtCommit(Oid subid)
5100 : {
5101 : MemoryContext oldcxt;
5102 :
5103 372 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
5104 372 : on_commit_wakeup_workers_subids =
5105 372 : list_append_unique_oid(on_commit_wakeup_workers_subids, subid);
5106 372 : MemoryContextSwitchTo(oldcxt);
5107 372 : }
5108 :
5109 : /*
5110 : * Wake up the workers of any subscriptions that were changed in this xact.
5111 : */
5112 : void
5113 759638 : AtEOXact_LogicalRepWorkers(bool isCommit)
5114 : {
5115 759638 : if (isCommit && on_commit_wakeup_workers_subids != NIL)
5116 : {
5117 : ListCell *lc;
5118 :
5119 362 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
5120 724 : foreach(lc, on_commit_wakeup_workers_subids)
5121 : {
5122 362 : Oid subid = lfirst_oid(lc);
5123 : List *workers;
5124 : ListCell *lc2;
5125 :
5126 362 : workers = logicalrep_workers_find(subid, true, false);
5127 464 : foreach(lc2, workers)
5128 : {
5129 102 : LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
5130 :
5131 102 : logicalrep_worker_wakeup_ptr(worker);
5132 : }
5133 : }
5134 362 : LWLockRelease(LogicalRepWorkerLock);
5135 : }
5136 :
5137 : /* The List storage will be reclaimed automatically in xact cleanup. */
5138 759638 : on_commit_wakeup_workers_subids = NIL;
5139 759638 : }
5140 :
5141 : /*
5142 : * Allocate the origin name in long-lived context for error context message.
5143 : */
5144 : void
5145 726 : set_apply_error_context_origin(char *originname)
5146 : {
5147 726 : apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
5148 : originname);
5149 726 : }
5150 :
5151 : /*
5152 : * Return the action to be taken for the given transaction. See
5153 : * TransApplyAction for information on each of the actions.
5154 : *
5155 : * *winfo is assigned to the destination parallel worker info when the leader
5156 : * apply worker has to pass all the transaction's changes to the parallel
5157 : * apply worker.
5158 : */
5159 : static TransApplyAction
5160 653854 : get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
5161 : {
5162 653854 : *winfo = NULL;
5163 :
5164 653854 : if (am_parallel_apply_worker())
5165 : {
5166 139224 : return TRANS_PARALLEL_APPLY;
5167 : }
5168 :
5169 : /*
5170 : * If we are processing this transaction using a parallel apply worker
5171 : * then either we send the changes to the parallel worker or if the worker
5172 : * is busy then serialize the changes to the file which will later be
5173 : * processed by the parallel worker.
5174 : */
5175 514630 : *winfo = pa_find_worker(xid);
5176 :
5177 514630 : if (*winfo && (*winfo)->serialize_changes)
5178 : {
5179 10074 : return TRANS_LEADER_PARTIAL_SERIALIZE;
5180 : }
5181 504556 : else if (*winfo)
5182 : {
5183 137824 : return TRANS_LEADER_SEND_TO_PARALLEL;
5184 : }
5185 :
5186 : /*
5187 : * If there is no parallel worker involved to process this transaction
5188 : * then we either directly apply the change or serialize it to a file
5189 : * which will later be applied when the transaction finish message is
5190 : * processed.
5191 : */
5192 366732 : else if (in_streamed_transaction)
5193 : {
5194 206492 : return TRANS_LEADER_SERIALIZE;
5195 : }
5196 : else
5197 : {
5198 160240 : return TRANS_LEADER_APPLY;
5199 : }
5200 : }
|