Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * applyparallelworker.c
3 : * Support routines for applying xact by parallel apply worker
4 : *
5 : * Copyright (c) 2023-2026, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/applyparallelworker.c
9 : *
10 : * This file contains the code to launch, set up, and teardown a parallel apply
11 : * worker which receives the changes from the leader worker and invokes routines
12 : * to apply those on the subscriber database. Additionally, this file contains
13 : * routines that are intended to support setting up, using, and tearing down a
14 : * ParallelApplyWorkerInfo which is required so the leader worker and parallel
15 : * apply workers can communicate with each other.
16 : *
17 : * The parallel apply workers are assigned (if available) as soon as xact's
18 : * first stream is received for subscriptions that have set their 'streaming'
19 : * option as parallel. The leader apply worker will send changes to this new
20 : * worker via shared memory. We keep this worker assigned till the transaction
21 : * commit is received and also wait for the worker to finish at commit. This
22 : * preserves commit ordering and avoid file I/O in most cases, although we
23 : * still need to spill to a file if there is no worker available. See comments
24 : * atop logical/worker to know more about streamed xacts whose changes are
25 : * spilled to disk. It is important to maintain commit order to avoid failures
26 : * due to: (a) transaction dependencies - say if we insert a row in the first
27 : * transaction and update it in the second transaction on publisher then
28 : * allowing the subscriber to apply both in parallel can lead to failure in the
29 : * update; (b) deadlocks - allowing transactions that update the same set of
30 : * rows/tables in the opposite order to be applied in parallel can lead to
31 : * deadlocks.
32 : *
33 : * A worker pool is used to avoid restarting workers for each streaming
34 : * transaction. We maintain each worker's information (ParallelApplyWorkerInfo)
35 : * in the ParallelApplyWorkerPool. After successfully launching a new worker,
36 : * its information is added to the ParallelApplyWorkerPool. Once the worker
37 : * finishes applying the transaction, it is marked as available for re-use.
38 : * Now, before starting a new worker to apply the streaming transaction, we
39 : * check the list for any available worker. Note that we retain a maximum of
40 : * half the max_parallel_apply_workers_per_subscription workers in the pool and
41 : * after that, we simply exit the worker after applying the transaction.
42 : *
43 : * XXX This worker pool threshold is arbitrary and we can provide a GUC
44 : * variable for this in the future if required.
45 : *
46 : * The leader apply worker will create a separate dynamic shared memory segment
47 : * when each parallel apply worker starts. The reason for this design is that
48 : * we cannot predict how many workers will be needed. It may be possible to
49 : * allocate enough shared memory in one segment based on the maximum number of
50 : * parallel apply workers (max_parallel_apply_workers_per_subscription), but
51 : * this would waste memory if no process is actually started.
52 : *
53 : * The dynamic shared memory segment contains: (a) a shm_mq that is used to
54 : * send changes in the transaction from leader apply worker to parallel apply
55 : * worker; (b) another shm_mq that is used to send errors (and other messages
56 : * reported via elog/ereport) from the parallel apply worker to leader apply
57 : * worker; (c) necessary information to be shared among parallel apply workers
58 : * and the leader apply worker (i.e. members of ParallelApplyWorkerShared).
59 : *
60 : * Locking Considerations
61 : * ----------------------
62 : * We have a risk of deadlock due to concurrently applying the transactions in
63 : * parallel mode that were independent on the publisher side but became
64 : * dependent on the subscriber side due to the different database structures
65 : * (like schema of subscription tables, constraints, etc.) on each side. This
66 : * can happen even without parallel mode when there are concurrent operations
67 : * on the subscriber. In order to detect the deadlocks among leader (LA) and
68 : * parallel apply (PA) workers, we used lmgr locks when the PA waits for the
69 : * next stream (set of changes) and LA waits for PA to finish the transaction.
70 : * An alternative approach could be to not allow parallelism when the schema of
71 : * tables is different between the publisher and subscriber but that would be
72 : * too restrictive and would require the publisher to send much more
73 : * information than it is currently sending.
74 : *
75 : * Consider a case where the subscribed table does not have a unique key on the
76 : * publisher and has a unique key on the subscriber. The deadlock can happen in
77 : * the following ways:
78 : *
79 : * 1) Deadlock between the leader apply worker and a parallel apply worker
80 : *
81 : * Consider that the parallel apply worker (PA) is executing TX-1 and the
82 : * leader apply worker (LA) is executing TX-2 concurrently on the subscriber.
83 : * Now, LA is waiting for PA because of the unique key constraint of the
84 : * subscribed table while PA is waiting for LA to send the next stream of
85 : * changes or transaction finish command message.
86 : *
87 : * In order for lmgr to detect this, we have LA acquire a session lock on the
88 : * remote transaction (by pa_lock_stream()) and have PA wait on the lock before
89 : * trying to receive the next stream of changes. Specifically, LA will acquire
90 : * the lock in AccessExclusive mode before sending the STREAM_STOP and will
91 : * release it if already acquired after sending the STREAM_START, STREAM_ABORT
92 : * (for toplevel transaction), STREAM_PREPARE, and STREAM_COMMIT. The PA will
93 : * acquire the lock in AccessShare mode after processing STREAM_STOP and
94 : * STREAM_ABORT (for subtransaction) and then release the lock immediately
95 : * after acquiring it.
96 : *
97 : * The lock graph for the above example will look as follows:
98 : * LA (waiting to acquire the lock on the unique index) -> PA (waiting to
99 : * acquire the stream lock) -> LA
100 : *
101 : * This way, when PA is waiting for LA for the next stream of changes, we can
102 : * have a wait-edge from PA to LA in lmgr, which will make us detect the
103 : * deadlock between LA and PA.
104 : *
105 : * 2) Deadlock between the leader apply worker and parallel apply workers
106 : *
107 : * This scenario is similar to the first case but TX-1 and TX-2 are executed by
108 : * two parallel apply workers (PA-1 and PA-2 respectively). In this scenario,
109 : * PA-2 is waiting for PA-1 to complete its transaction while PA-1 is waiting
110 : * for subsequent input from LA. Also, LA is waiting for PA-2 to complete its
111 : * transaction in order to preserve the commit order. There is a deadlock among
112 : * the three processes.
113 : *
114 : * In order for lmgr to detect this, we have PA acquire a session lock (this is
115 : * a different lock than referred in the previous case, see
116 : * pa_lock_transaction()) on the transaction being applied and have LA wait on
117 : * the lock before proceeding in the transaction finish commands. Specifically,
118 : * PA will acquire this lock in AccessExclusive mode before executing the first
119 : * message of the transaction and release it at the xact end. LA will acquire
120 : * this lock in AccessShare mode at transaction finish commands and release it
121 : * immediately.
122 : *
123 : * The lock graph for the above example will look as follows:
124 : * LA (waiting to acquire the transaction lock) -> PA-2 (waiting to acquire the
125 : * lock due to unique index constraint) -> PA-1 (waiting to acquire the stream
126 : * lock) -> LA
127 : *
128 : * This way when LA is waiting to finish the transaction end command to preserve
129 : * the commit order, we will be able to detect deadlock, if any.
130 : *
131 : * One might think we can use XactLockTableWait(), but XactLockTableWait()
132 : * considers PREPARED TRANSACTION as still in progress which means the lock
133 : * won't be released even after the parallel apply worker has prepared the
134 : * transaction.
135 : *
136 : * 3) Deadlock when the shm_mq buffer is full
137 : *
138 : * In the previous scenario (ie. PA-1 and PA-2 are executing transactions
139 : * concurrently), if the shm_mq buffer between LA and PA-2 is full, LA has to
140 : * wait to send messages, and this wait doesn't appear in lmgr.
141 : *
142 : * To avoid this wait, we use a non-blocking write and wait with a timeout. If
143 : * the timeout is exceeded, the LA will serialize all the pending messages to
144 : * a file and indicate PA-2 that it needs to read that file for the remaining
145 : * messages. Then LA will start waiting for commit as in the previous case
146 : * which will detect deadlock if any. See pa_send_data() and
147 : * enum TransApplyAction.
148 : *
149 : * Lock types
150 : * ----------
151 : * Both the stream lock and the transaction lock mentioned above are
152 : * session-level locks because both locks could be acquired outside the
153 : * transaction, and the stream lock in the leader needs to persist across
154 : * transaction boundaries i.e. until the end of the streaming transaction.
155 : *-------------------------------------------------------------------------
156 : */
157 :
158 : #include "postgres.h"
159 :
160 : #include "libpq/pqformat.h"
161 : #include "libpq/pqmq.h"
162 : #include "pgstat.h"
163 : #include "postmaster/interrupt.h"
164 : #include "replication/logicallauncher.h"
165 : #include "replication/logicalworker.h"
166 : #include "replication/origin.h"
167 : #include "replication/worker_internal.h"
168 : #include "storage/ipc.h"
169 : #include "storage/latch.h"
170 : #include "storage/lmgr.h"
171 : #include "storage/proc.h"
172 : #include "tcop/tcopprot.h"
173 : #include "utils/inval.h"
174 : #include "utils/memutils.h"
175 : #include "utils/syscache.h"
176 : #include "utils/wait_event.h"
177 :
178 : #define PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067
179 :
180 : /*
181 : * DSM keys for parallel apply worker. Unlike other parallel execution code,
182 : * since we don't need to worry about DSM keys conflicting with plan_node_id we
183 : * can use small integers.
184 : */
185 : #define PARALLEL_APPLY_KEY_SHARED 1
186 : #define PARALLEL_APPLY_KEY_MQ 2
187 : #define PARALLEL_APPLY_KEY_ERROR_QUEUE 3
188 :
189 : /* Queue size of DSM, 16 MB for now. */
190 : #define DSM_QUEUE_SIZE (16 * 1024 * 1024)
191 :
192 : /*
193 : * Error queue size of DSM. It is desirable to make it large enough that a
194 : * typical ErrorResponse can be sent without blocking. That way, a worker that
195 : * errors out can write the whole message into the queue and terminate without
196 : * waiting for the user backend.
197 : */
198 : #define DSM_ERROR_QUEUE_SIZE (16 * 1024)
199 :
200 : /*
201 : * There are three fields in each message received by the parallel apply
202 : * worker: start_lsn, end_lsn and send_time. Because we have updated these
203 : * statistics in the leader apply worker, we can ignore these fields in the
204 : * parallel apply worker (see function LogicalRepApplyLoop).
205 : */
206 : #define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
207 :
208 : /*
209 : * The type of session-level lock on a transaction being applied on a logical
210 : * replication subscriber.
211 : */
212 : #define PARALLEL_APPLY_LOCK_STREAM 0
213 : #define PARALLEL_APPLY_LOCK_XACT 1
214 :
215 : /*
216 : * Hash table entry to map xid to the parallel apply worker state.
217 : */
218 : typedef struct ParallelApplyWorkerEntry
219 : {
220 : TransactionId xid; /* Hash key -- must be first */
221 : ParallelApplyWorkerInfo *winfo;
222 : } ParallelApplyWorkerEntry;
223 :
224 : /*
225 : * A hash table used to cache the state of streaming transactions being applied
226 : * by the parallel apply workers.
227 : */
228 : static HTAB *ParallelApplyTxnHash = NULL;
229 :
230 : /*
231 : * A list (pool) of active parallel apply workers. The information for
232 : * the new worker is added to the list after successfully launching it. The
233 : * list entry is removed if there are already enough workers in the worker
234 : * pool at the end of the transaction. For more information about the worker
235 : * pool, see comments atop this file.
236 : */
237 : static List *ParallelApplyWorkerPool = NIL;
238 :
239 : /*
240 : * Information shared between leader apply worker and parallel apply worker.
241 : */
242 : ParallelApplyWorkerShared *MyParallelShared = NULL;
243 :
244 : /*
245 : * Is there a message sent by a parallel apply worker that the leader apply
246 : * worker needs to receive?
247 : */
248 : volatile sig_atomic_t ParallelApplyMessagePending = false;
249 :
250 : /*
251 : * Cache the parallel apply worker information required for applying the
252 : * current streaming transaction. It is used to save the cost of searching the
253 : * hash table when applying the changes between STREAM_START and STREAM_STOP.
254 : */
255 : static ParallelApplyWorkerInfo *stream_apply_worker = NULL;
256 :
257 : /* A list to maintain subtransactions, if any. */
258 : static List *subxactlist = NIL;
259 :
260 : static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo);
261 : static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared);
262 : static PartialFileSetState pa_get_fileset_state(void);
263 :
264 : /*
265 : * Returns true if it is OK to start a parallel apply worker, false otherwise.
266 : */
267 : static bool
268 86 : pa_can_start(void)
269 : {
270 : /* Only leader apply workers can start parallel apply workers. */
271 86 : if (!am_leader_apply_worker())
272 29 : return false;
273 :
274 : /*
275 : * It is good to check for any change in the subscription parameter to
276 : * avoid the case where for a very long time the change doesn't get
277 : * reflected. This can happen when there is a constant flow of streaming
278 : * transactions that are handled by parallel apply workers.
279 : *
280 : * It is better to do it before the below checks so that the latest values
281 : * of subscription can be used for the checks.
282 : */
283 57 : maybe_reread_subscription();
284 :
285 : /*
286 : * Don't start a new parallel apply worker if the subscription is not
287 : * using parallel streaming mode, or if the publisher does not support
288 : * parallel apply.
289 : */
290 57 : if (!MyLogicalRepWorker->parallel_apply)
291 28 : return false;
292 :
293 : /*
294 : * Don't start a new parallel worker if user has set skiplsn as it's
295 : * possible that they want to skip the streaming transaction. For
296 : * streaming transactions, we need to serialize the transaction to a file
297 : * so that we can get the last LSN of the transaction to judge whether to
298 : * skip before starting to apply the change.
299 : *
300 : * One might think that we could allow parallelism if the first lsn of the
301 : * transaction is greater than skiplsn, but we don't send it with the
302 : * STREAM START message, and it doesn't seem worth sending the extra eight
303 : * bytes with the STREAM START to enable parallelism for this case.
304 : */
305 29 : if (XLogRecPtrIsValid(MySubscription->skiplsn))
306 0 : return false;
307 :
308 : /*
309 : * For streaming transactions that are being applied using a parallel
310 : * apply worker, we cannot decide whether to apply the change for a
311 : * relation that is not in the READY state (see
312 : * should_apply_changes_for_rel) as we won't know remote_final_lsn by that
313 : * time. So, we don't start the new parallel apply worker in this case.
314 : */
315 29 : if (!AllTablesyncsReady())
316 0 : return false;
317 :
318 29 : return true;
319 : }
320 :
321 : /*
322 : * Set up a dynamic shared memory segment.
323 : *
324 : * We set up a control region that contains a fixed-size worker info
325 : * (ParallelApplyWorkerShared), a message queue, and an error queue.
326 : *
327 : * Returns true on success, false on failure.
328 : */
329 : static bool
330 12 : pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
331 : {
332 : shm_toc_estimator e;
333 : Size segsize;
334 : dsm_segment *seg;
335 : shm_toc *toc;
336 : ParallelApplyWorkerShared *shared;
337 : shm_mq *mq;
338 12 : Size queue_size = DSM_QUEUE_SIZE;
339 12 : Size error_queue_size = DSM_ERROR_QUEUE_SIZE;
340 :
341 : /*
342 : * Estimate how much shared memory we need.
343 : *
344 : * Because the TOC machinery may choose to insert padding of oddly-sized
345 : * requests, we must estimate each chunk separately.
346 : *
347 : * We need one key to register the location of the header, and two other
348 : * keys to track the locations of the message queue and the error message
349 : * queue.
350 : */
351 12 : shm_toc_initialize_estimator(&e);
352 12 : shm_toc_estimate_chunk(&e, sizeof(ParallelApplyWorkerShared));
353 12 : shm_toc_estimate_chunk(&e, queue_size);
354 12 : shm_toc_estimate_chunk(&e, error_queue_size);
355 :
356 12 : shm_toc_estimate_keys(&e, 3);
357 12 : segsize = shm_toc_estimate(&e);
358 :
359 : /* Create the shared memory segment and establish a table of contents. */
360 12 : seg = dsm_create(shm_toc_estimate(&e), 0);
361 12 : if (!seg)
362 0 : return false;
363 :
364 12 : toc = shm_toc_create(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg),
365 : segsize);
366 :
367 : /* Set up the header region. */
368 12 : shared = shm_toc_allocate(toc, sizeof(ParallelApplyWorkerShared));
369 12 : SpinLockInit(&shared->mutex);
370 :
371 12 : shared->xact_state = PARALLEL_TRANS_UNKNOWN;
372 12 : pg_atomic_init_u32(&(shared->pending_stream_count), 0);
373 12 : shared->last_commit_end = InvalidXLogRecPtr;
374 12 : shared->fileset_state = FS_EMPTY;
375 :
376 12 : shm_toc_insert(toc, PARALLEL_APPLY_KEY_SHARED, shared);
377 :
378 : /* Set up message queue for the worker. */
379 12 : mq = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
380 12 : shm_toc_insert(toc, PARALLEL_APPLY_KEY_MQ, mq);
381 12 : shm_mq_set_sender(mq, MyProc);
382 :
383 : /* Attach the queue. */
384 12 : winfo->mq_handle = shm_mq_attach(mq, seg, NULL);
385 :
386 : /* Set up error queue for the worker. */
387 12 : mq = shm_mq_create(shm_toc_allocate(toc, error_queue_size),
388 : error_queue_size);
389 12 : shm_toc_insert(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, mq);
390 12 : shm_mq_set_receiver(mq, MyProc);
391 :
392 : /* Attach the queue. */
393 12 : winfo->error_mq_handle = shm_mq_attach(mq, seg, NULL);
394 :
395 : /* Return results to caller. */
396 12 : winfo->dsm_seg = seg;
397 12 : winfo->shared = shared;
398 :
399 12 : return true;
400 : }
401 :
402 : /*
403 : * Try to get a parallel apply worker from the pool. If none is available then
404 : * start a new one.
405 : */
406 : static ParallelApplyWorkerInfo *
407 29 : pa_launch_parallel_worker(void)
408 : {
409 : MemoryContext oldcontext;
410 : bool launched;
411 : ParallelApplyWorkerInfo *winfo;
412 : ListCell *lc;
413 :
414 : /* Try to get an available parallel apply worker from the worker pool. */
415 31 : foreach(lc, ParallelApplyWorkerPool)
416 : {
417 19 : winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
418 :
419 19 : if (!winfo->in_use)
420 17 : return winfo;
421 : }
422 :
423 : /*
424 : * Start a new parallel apply worker.
425 : *
426 : * The worker info can be used for the lifetime of the worker process, so
427 : * create it in a permanent context.
428 : */
429 12 : oldcontext = MemoryContextSwitchTo(ApplyContext);
430 :
431 12 : winfo = palloc0_object(ParallelApplyWorkerInfo);
432 :
433 : /* Setup shared memory. */
434 12 : if (!pa_setup_dsm(winfo))
435 : {
436 0 : MemoryContextSwitchTo(oldcontext);
437 0 : pfree(winfo);
438 0 : return NULL;
439 : }
440 :
441 12 : launched = logicalrep_worker_launch(WORKERTYPE_PARALLEL_APPLY,
442 12 : MyLogicalRepWorker->dbid,
443 12 : MySubscription->oid,
444 12 : MySubscription->name,
445 12 : MyLogicalRepWorker->userid,
446 : InvalidOid,
447 : dsm_segment_handle(winfo->dsm_seg),
448 : false);
449 :
450 12 : if (launched)
451 : {
452 12 : ParallelApplyWorkerPool = lappend(ParallelApplyWorkerPool, winfo);
453 : }
454 : else
455 : {
456 0 : pa_free_worker_info(winfo);
457 0 : winfo = NULL;
458 : }
459 :
460 12 : MemoryContextSwitchTo(oldcontext);
461 :
462 12 : return winfo;
463 : }
464 :
465 : /*
466 : * Allocate a parallel apply worker that will be used for the specified xid.
467 : *
468 : * We first try to get an available worker from the pool, if any and then try
469 : * to launch a new worker. On successful allocation, remember the worker
470 : * information in the hash table so that we can get it later for processing the
471 : * streaming changes.
472 : */
473 : void
474 86 : pa_allocate_worker(TransactionId xid)
475 : {
476 : bool found;
477 86 : ParallelApplyWorkerInfo *winfo = NULL;
478 : ParallelApplyWorkerEntry *entry;
479 :
480 86 : if (!pa_can_start())
481 57 : return;
482 :
483 29 : winfo = pa_launch_parallel_worker();
484 29 : if (!winfo)
485 0 : return;
486 :
487 : /* First time through, initialize parallel apply worker state hashtable. */
488 29 : if (!ParallelApplyTxnHash)
489 : {
490 : HASHCTL ctl;
491 :
492 117 : MemSet(&ctl, 0, sizeof(ctl));
493 9 : ctl.keysize = sizeof(TransactionId);
494 9 : ctl.entrysize = sizeof(ParallelApplyWorkerEntry);
495 9 : ctl.hcxt = ApplyContext;
496 :
497 9 : ParallelApplyTxnHash = hash_create("logical replication parallel apply workers hash",
498 : 16, &ctl,
499 : HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
500 : }
501 :
502 : /* Create an entry for the requested transaction. */
503 29 : entry = hash_search(ParallelApplyTxnHash, &xid, HASH_ENTER, &found);
504 29 : if (found)
505 0 : elog(ERROR, "hash table corrupted");
506 :
507 : /* Update the transaction information in shared memory. */
508 29 : SpinLockAcquire(&winfo->shared->mutex);
509 29 : winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN;
510 29 : winfo->shared->xid = xid;
511 29 : SpinLockRelease(&winfo->shared->mutex);
512 :
513 29 : winfo->in_use = true;
514 29 : winfo->serialize_changes = false;
515 29 : entry->winfo = winfo;
516 : }
517 :
518 : /*
519 : * Find the assigned worker for the given transaction, if any.
520 : */
521 : ParallelApplyWorkerInfo *
522 257455 : pa_find_worker(TransactionId xid)
523 : {
524 : bool found;
525 : ParallelApplyWorkerEntry *entry;
526 :
527 257455 : if (!TransactionIdIsValid(xid))
528 80272 : return NULL;
529 :
530 177183 : if (!ParallelApplyTxnHash)
531 103240 : return NULL;
532 :
533 : /* Return the cached parallel apply worker if valid. */
534 73943 : if (stream_apply_worker)
535 73651 : return stream_apply_worker;
536 :
537 : /* Find an entry for the requested transaction. */
538 292 : entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
539 292 : if (found)
540 : {
541 : /* The worker must not have exited. */
542 : Assert(entry->winfo->in_use);
543 292 : return entry->winfo;
544 : }
545 :
546 0 : return NULL;
547 : }
548 :
549 : /*
550 : * Makes the worker available for reuse.
551 : *
552 : * This removes the parallel apply worker entry from the hash table so that it
553 : * can't be used. If there are enough workers in the pool, it stops the worker
554 : * and frees the corresponding info. Otherwise it just marks the worker as
555 : * available for reuse.
556 : *
557 : * For more information about the worker pool, see comments atop this file.
558 : */
559 : static void
560 25 : pa_free_worker(ParallelApplyWorkerInfo *winfo)
561 : {
562 : Assert(!am_parallel_apply_worker());
563 : Assert(winfo->in_use);
564 : Assert(pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED);
565 :
566 25 : if (!hash_search(ParallelApplyTxnHash, &winfo->shared->xid, HASH_REMOVE, NULL))
567 0 : elog(ERROR, "hash table corrupted");
568 :
569 : /*
570 : * Stop the worker if there are enough workers in the pool.
571 : *
572 : * XXX Additionally, we also stop the worker if the leader apply worker
573 : * serialize part of the transaction data due to a send timeout. This is
574 : * because the message could be partially written to the queue and there
575 : * is no way to clean the queue other than resending the message until it
576 : * succeeds. Instead of trying to send the data which anyway would have
577 : * been serialized and then letting the parallel apply worker deal with
578 : * the spurious message, we stop the worker.
579 : */
580 25 : if (winfo->serialize_changes ||
581 21 : list_length(ParallelApplyWorkerPool) >
582 21 : (max_parallel_apply_workers_per_subscription / 2))
583 : {
584 5 : logicalrep_pa_worker_stop(winfo);
585 5 : pa_free_worker_info(winfo);
586 :
587 5 : return;
588 : }
589 :
590 20 : winfo->in_use = false;
591 20 : winfo->serialize_changes = false;
592 : }
593 :
594 : /*
595 : * Free the parallel apply worker information and unlink the files with
596 : * serialized changes if any.
597 : */
598 : static void
599 5 : pa_free_worker_info(ParallelApplyWorkerInfo *winfo)
600 : {
601 : Assert(winfo);
602 :
603 5 : if (winfo->mq_handle)
604 5 : shm_mq_detach(winfo->mq_handle);
605 :
606 5 : if (winfo->error_mq_handle)
607 0 : shm_mq_detach(winfo->error_mq_handle);
608 :
609 : /* Unlink the files with serialized changes. */
610 5 : if (winfo->serialize_changes)
611 4 : stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid);
612 :
613 5 : if (winfo->dsm_seg)
614 5 : dsm_detach(winfo->dsm_seg);
615 :
616 : /* Remove from the worker pool. */
617 5 : ParallelApplyWorkerPool = list_delete_ptr(ParallelApplyWorkerPool, winfo);
618 :
619 5 : pfree(winfo);
620 5 : }
621 :
622 : /*
623 : * Detach the error queue for all parallel apply workers.
624 : */
625 : void
626 354 : pa_detach_all_error_mq(void)
627 : {
628 : ListCell *lc;
629 :
630 361 : foreach(lc, ParallelApplyWorkerPool)
631 : {
632 7 : ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
633 :
634 7 : if (winfo->error_mq_handle)
635 : {
636 7 : shm_mq_detach(winfo->error_mq_handle);
637 7 : winfo->error_mq_handle = NULL;
638 : }
639 : }
640 354 : }
641 :
642 : /*
643 : * Check if there are any pending spooled messages.
644 : */
645 : static bool
646 16 : pa_has_spooled_message_pending(void)
647 : {
648 : PartialFileSetState fileset_state;
649 :
650 16 : fileset_state = pa_get_fileset_state();
651 :
652 16 : return (fileset_state != FS_EMPTY);
653 : }
654 :
655 : /*
656 : * Replay the spooled messages once the leader apply worker has finished
657 : * serializing changes to the file.
658 : *
659 : * Returns false if there aren't any pending spooled messages, true otherwise.
660 : */
661 : static bool
662 56 : pa_process_spooled_messages_if_required(void)
663 : {
664 : PartialFileSetState fileset_state;
665 :
666 56 : fileset_state = pa_get_fileset_state();
667 :
668 56 : if (fileset_state == FS_EMPTY)
669 48 : return false;
670 :
671 : /*
672 : * If the leader apply worker is busy serializing the partial changes then
673 : * acquire the stream lock now and wait for the leader worker to finish
674 : * serializing the changes. Otherwise, the parallel apply worker won't get
675 : * a chance to receive a STREAM_STOP (and acquire the stream lock) until
676 : * the leader had serialized all changes which can lead to undetected
677 : * deadlock.
678 : *
679 : * Note that the fileset state can be FS_SERIALIZE_DONE once the leader
680 : * worker has finished serializing the changes.
681 : */
682 8 : if (fileset_state == FS_SERIALIZE_IN_PROGRESS)
683 : {
684 0 : pa_lock_stream(MyParallelShared->xid, AccessShareLock);
685 0 : pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
686 :
687 0 : fileset_state = pa_get_fileset_state();
688 : }
689 :
690 : /*
691 : * We cannot read the file immediately after the leader has serialized all
692 : * changes to the file because there may still be messages in the memory
693 : * queue. We will apply all spooled messages the next time we call this
694 : * function and that will ensure there are no messages left in the memory
695 : * queue.
696 : */
697 8 : if (fileset_state == FS_SERIALIZE_DONE)
698 : {
699 4 : pa_set_fileset_state(MyParallelShared, FS_READY);
700 : }
701 4 : else if (fileset_state == FS_READY)
702 : {
703 4 : apply_spooled_messages(&MyParallelShared->fileset,
704 4 : MyParallelShared->xid,
705 : InvalidXLogRecPtr);
706 4 : pa_set_fileset_state(MyParallelShared, FS_EMPTY);
707 : }
708 :
709 8 : return true;
710 : }
711 :
712 : /*
713 : * Interrupt handler for main loop of parallel apply worker.
714 : */
715 : static void
716 64133 : ProcessParallelApplyInterrupts(void)
717 : {
718 64133 : CHECK_FOR_INTERRUPTS();
719 :
720 64129 : if (ShutdownRequestPending)
721 : {
722 5 : ereport(LOG,
723 : (errmsg("logical replication parallel apply worker for subscription \"%s\" has finished",
724 : MySubscription->name)));
725 :
726 5 : proc_exit(0);
727 : }
728 :
729 64124 : if (ConfigReloadPending)
730 : {
731 4 : ConfigReloadPending = false;
732 4 : ProcessConfigFile(PGC_SIGHUP);
733 : }
734 64124 : }
735 :
736 : /* Parallel apply worker main loop. */
737 : static void
738 12 : LogicalParallelApplyLoop(shm_mq_handle *mqh)
739 : {
740 : shm_mq_result shmq_res;
741 : ErrorContextCallback errcallback;
742 12 : MemoryContext oldcxt = CurrentMemoryContext;
743 :
744 : /*
745 : * Init the ApplyMessageContext which we clean up after each replication
746 : * protocol message.
747 : */
748 12 : ApplyMessageContext = AllocSetContextCreate(ApplyContext,
749 : "ApplyMessageContext",
750 : ALLOCSET_DEFAULT_SIZES);
751 :
752 : /*
753 : * Push apply error context callback. Fields will be filled while applying
754 : * a change.
755 : */
756 12 : errcallback.callback = apply_error_callback;
757 12 : errcallback.previous = error_context_stack;
758 12 : error_context_stack = &errcallback;
759 :
760 : for (;;)
761 64121 : {
762 : void *data;
763 : Size len;
764 :
765 64133 : ProcessParallelApplyInterrupts();
766 :
767 : /* Ensure we are reading the data into our memory context. */
768 64124 : MemoryContextSwitchTo(ApplyMessageContext);
769 :
770 64124 : shmq_res = shm_mq_receive(mqh, &len, &data, true);
771 :
772 64124 : if (shmq_res == SHM_MQ_SUCCESS)
773 : {
774 : StringInfoData s;
775 : int c;
776 :
777 64068 : if (len == 0)
778 0 : elog(ERROR, "invalid message length");
779 :
780 64068 : initReadOnlyStringInfo(&s, data, len);
781 :
782 : /*
783 : * The first byte of messages sent from leader apply worker to
784 : * parallel apply workers can only be PqReplMsg_WALData.
785 : */
786 64068 : c = pq_getmsgbyte(&s);
787 64068 : if (c != PqReplMsg_WALData)
788 0 : elog(ERROR, "unexpected message \"%c\"", c);
789 :
790 : /*
791 : * Ignore statistics fields that have been updated by the leader
792 : * apply worker.
793 : *
794 : * XXX We can avoid sending the statistics fields from the leader
795 : * apply worker but for that, it needs to rebuild the entire
796 : * message by removing these fields which could be more work than
797 : * simply ignoring these fields in the parallel apply worker.
798 : */
799 64068 : s.cursor += SIZE_STATS_MESSAGE;
800 :
801 64068 : apply_dispatch(&s);
802 : }
803 56 : else if (shmq_res == SHM_MQ_WOULD_BLOCK)
804 : {
805 : /* Replay the changes from the file, if any. */
806 56 : if (!pa_process_spooled_messages_if_required())
807 : {
808 : int rc;
809 :
810 : /* Wait for more work. */
811 48 : rc = WaitLatch(MyLatch,
812 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
813 : 1000L,
814 : WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN);
815 :
816 48 : if (rc & WL_LATCH_SET)
817 43 : ResetLatch(MyLatch);
818 : }
819 : }
820 : else
821 : {
822 : Assert(shmq_res == SHM_MQ_DETACHED);
823 :
824 0 : ereport(ERROR,
825 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
826 : errmsg("lost connection to the logical replication apply worker")));
827 : }
828 :
829 64121 : MemoryContextReset(ApplyMessageContext);
830 64121 : MemoryContextSwitchTo(oldcxt);
831 : }
832 :
833 : /* Pop the error context stack. */
834 : error_context_stack = errcallback.previous;
835 :
836 : MemoryContextSwitchTo(oldcxt);
837 : }
838 :
839 : /*
840 : * Make sure the leader apply worker tries to read from our error queue one more
841 : * time. This guards against the case where we exit uncleanly without sending
842 : * an ErrorResponse, for example because some code calls proc_exit directly.
843 : *
844 : * Also explicitly detach from dsm segment to invoke on_dsm_detach callbacks,
845 : * if any. See ParallelWorkerShutdown for details.
846 : */
847 : static void
848 12 : pa_shutdown(int code, Datum arg)
849 : {
850 12 : SendProcSignal(MyLogicalRepWorker->leader_pid,
851 : PROCSIG_PARALLEL_APPLY_MESSAGE,
852 : INVALID_PROC_NUMBER);
853 :
854 12 : dsm_detach((dsm_segment *) DatumGetPointer(arg));
855 12 : }
856 :
857 : /*
858 : * Parallel apply worker entry point.
859 : */
860 : void
861 12 : ParallelApplyWorkerMain(Datum main_arg)
862 : {
863 : ParallelApplyWorkerShared *shared;
864 : dsm_handle handle;
865 : dsm_segment *seg;
866 : shm_toc *toc;
867 : shm_mq *mq;
868 : shm_mq_handle *mqh;
869 : shm_mq_handle *error_mqh;
870 : ReplOriginId originid;
871 12 : int worker_slot = DatumGetInt32(main_arg);
872 : char originname[NAMEDATALEN];
873 :
874 12 : InitializingApplyWorker = true;
875 :
876 : /*
877 : * Setup signal handling.
878 : *
879 : * Note: We intentionally used SIGUSR2 to trigger a graceful shutdown
880 : * initiated by the leader apply worker. This helps to differentiate it
881 : * from the case where we abort the current transaction and exit on
882 : * receiving SIGTERM.
883 : */
884 12 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
885 12 : pqsignal(SIGUSR2, SignalHandlerForShutdownRequest);
886 12 : BackgroundWorkerUnblockSignals();
887 :
888 : /*
889 : * Attach to the dynamic shared memory segment for the parallel apply, and
890 : * find its table of contents.
891 : *
892 : * Like parallel query, we don't need resource owner by this time. See
893 : * ParallelWorkerMain.
894 : */
895 12 : memcpy(&handle, MyBgworkerEntry->bgw_extra, sizeof(dsm_handle));
896 12 : seg = dsm_attach(handle);
897 12 : if (!seg)
898 0 : ereport(ERROR,
899 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
900 : errmsg("could not map dynamic shared memory segment")));
901 :
902 12 : toc = shm_toc_attach(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg));
903 12 : if (!toc)
904 0 : ereport(ERROR,
905 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
906 : errmsg("invalid magic number in dynamic shared memory segment")));
907 :
908 : /* Look up the shared information. */
909 12 : shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false);
910 12 : MyParallelShared = shared;
911 :
912 : /*
913 : * Attach to the message queue.
914 : */
915 12 : mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_MQ, false);
916 12 : shm_mq_set_receiver(mq, MyProc);
917 12 : mqh = shm_mq_attach(mq, seg, NULL);
918 :
919 : /*
920 : * Primary initialization is complete. Now, we can attach to our slot.
921 : * This is to ensure that the leader apply worker does not write data to
922 : * the uninitialized memory queue.
923 : */
924 12 : logicalrep_worker_attach(worker_slot);
925 :
926 : /*
927 : * Register the shutdown callback after we are attached to the worker
928 : * slot. This is to ensure that MyLogicalRepWorker remains valid when this
929 : * callback is invoked.
930 : */
931 12 : before_shmem_exit(pa_shutdown, PointerGetDatum(seg));
932 :
933 12 : SpinLockAcquire(&MyParallelShared->mutex);
934 12 : MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation;
935 12 : MyParallelShared->logicalrep_worker_slot_no = worker_slot;
936 12 : SpinLockRelease(&MyParallelShared->mutex);
937 :
938 : /*
939 : * Attach to the error queue.
940 : */
941 12 : mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, false);
942 12 : shm_mq_set_sender(mq, MyProc);
943 12 : error_mqh = shm_mq_attach(mq, seg, NULL);
944 :
945 12 : pq_redirect_to_shm_mq(seg, error_mqh);
946 12 : pq_set_parallel_leader(MyLogicalRepWorker->leader_pid,
947 : INVALID_PROC_NUMBER);
948 :
949 12 : MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
950 12 : MyLogicalRepWorker->reply_time = 0;
951 :
952 12 : InitializeLogRepWorker();
953 :
954 12 : InitializingApplyWorker = false;
955 :
956 : /* Setup replication origin tracking. */
957 12 : StartTransactionCommand();
958 12 : ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
959 : originname, sizeof(originname));
960 12 : originid = replorigin_by_name(originname, false);
961 :
962 : /*
963 : * The parallel apply worker doesn't need to monopolize this replication
964 : * origin which was already acquired by its leader process.
965 : */
966 12 : replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
967 12 : replorigin_xact_state.origin = originid;
968 12 : CommitTransactionCommand();
969 :
970 : /*
971 : * Setup callback for syscache so that we know when something changes in
972 : * the subscription relation state.
973 : */
974 12 : CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
975 : InvalidateSyncingRelStates,
976 : (Datum) 0);
977 :
978 12 : set_apply_error_context_origin(originname);
979 :
980 12 : LogicalParallelApplyLoop(mqh);
981 :
982 : /*
983 : * The parallel apply worker must not get here because the parallel apply
984 : * worker will only stop when it receives a SIGTERM or SIGUSR2 from the
985 : * leader, or SIGINT from itself, or when there is an error. None of these
986 : * cases will allow the code to reach here.
987 : */
988 : Assert(false);
989 0 : }
990 :
991 : /*
992 : * Handle receipt of an interrupt indicating a parallel apply worker message.
993 : *
994 : * Note: this is called within a signal handler! All we can do is set a flag
995 : * that will cause the next CHECK_FOR_INTERRUPTS() to invoke
996 : * ProcessParallelApplyMessages().
997 : */
998 : void
999 13 : HandleParallelApplyMessageInterrupt(void)
1000 : {
1001 13 : InterruptPending = true;
1002 13 : ParallelApplyMessagePending = true;
1003 13 : SetLatch(MyLatch);
1004 13 : }
1005 :
1006 : /*
1007 : * Process a single protocol message received from a single parallel apply
1008 : * worker.
1009 : */
1010 : static void
1011 2 : ProcessParallelApplyMessage(StringInfo msg)
1012 : {
1013 : char msgtype;
1014 :
1015 2 : msgtype = pq_getmsgbyte(msg);
1016 :
1017 2 : switch (msgtype)
1018 : {
1019 2 : case PqMsg_ErrorResponse:
1020 : {
1021 : ErrorData edata;
1022 :
1023 : /* Parse ErrorResponse. */
1024 2 : pq_parse_errornotice(msg, &edata);
1025 :
1026 : /*
1027 : * If desired, add a context line to show that this is a
1028 : * message propagated from a parallel apply worker. Otherwise,
1029 : * it can sometimes be confusing to understand what actually
1030 : * happened.
1031 : */
1032 2 : if (edata.context)
1033 2 : edata.context = psprintf("%s\n%s", edata.context,
1034 : _("logical replication parallel apply worker"));
1035 : else
1036 0 : edata.context = pstrdup(_("logical replication parallel apply worker"));
1037 :
1038 : /*
1039 : * Context beyond that should use the error context callbacks
1040 : * that were in effect in LogicalRepApplyLoop().
1041 : */
1042 2 : error_context_stack = apply_error_context_stack;
1043 :
1044 : /*
1045 : * The actual error must have been reported by the parallel
1046 : * apply worker.
1047 : */
1048 2 : ereport(ERROR,
1049 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1050 : errmsg("logical replication parallel apply worker exited due to error"),
1051 : errcontext("%s", edata.context)));
1052 : }
1053 :
1054 : /*
1055 : * Don't need to do anything about NoticeResponse and
1056 : * NotificationResponse as the logical replication worker doesn't
1057 : * need to send messages to the client.
1058 : */
1059 0 : case PqMsg_NoticeResponse:
1060 : case PqMsg_NotificationResponse:
1061 0 : break;
1062 :
1063 0 : default:
1064 0 : elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",
1065 : msgtype, msg->len);
1066 : }
1067 0 : }
1068 :
1069 : /*
1070 : * Handle any queued protocol messages received from parallel apply workers.
1071 : */
1072 : void
1073 7 : ProcessParallelApplyMessages(void)
1074 : {
1075 : ListCell *lc;
1076 : MemoryContext oldcontext;
1077 :
1078 : static MemoryContext hpam_context = NULL;
1079 :
1080 : /*
1081 : * This is invoked from ProcessInterrupts(), and since some of the
1082 : * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
1083 : * for recursive calls if more signals are received while this runs. It's
1084 : * unclear that recursive entry would be safe, and it doesn't seem useful
1085 : * even if it is safe, so let's block interrupts until done.
1086 : */
1087 7 : HOLD_INTERRUPTS();
1088 :
1089 : /*
1090 : * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
1091 : * don't want to risk leaking data into long-lived contexts, so let's do
1092 : * our work here in a private context that we can reset on each use.
1093 : */
1094 7 : if (!hpam_context) /* first time through? */
1095 6 : hpam_context = AllocSetContextCreate(TopMemoryContext,
1096 : "ProcessParallelApplyMessages",
1097 : ALLOCSET_DEFAULT_SIZES);
1098 : else
1099 1 : MemoryContextReset(hpam_context);
1100 :
1101 7 : oldcontext = MemoryContextSwitchTo(hpam_context);
1102 :
1103 7 : ParallelApplyMessagePending = false;
1104 :
1105 13 : foreach(lc, ParallelApplyWorkerPool)
1106 : {
1107 : shm_mq_result res;
1108 : Size nbytes;
1109 : void *data;
1110 8 : ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
1111 :
1112 : /*
1113 : * The leader will detach from the error queue and set it to NULL
1114 : * before preparing to stop all parallel apply workers, so we don't
1115 : * need to handle error messages anymore. See
1116 : * logicalrep_worker_detach.
1117 : */
1118 8 : if (!winfo->error_mq_handle)
1119 6 : continue;
1120 :
1121 3 : res = shm_mq_receive(winfo->error_mq_handle, &nbytes, &data, true);
1122 :
1123 3 : if (res == SHM_MQ_WOULD_BLOCK)
1124 1 : continue;
1125 2 : else if (res == SHM_MQ_SUCCESS)
1126 : {
1127 : StringInfoData msg;
1128 :
1129 2 : initStringInfo(&msg);
1130 2 : appendBinaryStringInfo(&msg, data, nbytes);
1131 2 : ProcessParallelApplyMessage(&msg);
1132 0 : pfree(msg.data);
1133 : }
1134 : else
1135 0 : ereport(ERROR,
1136 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1137 : errmsg("lost connection to the logical replication parallel apply worker")));
1138 : }
1139 :
1140 5 : MemoryContextSwitchTo(oldcontext);
1141 :
1142 : /* Might as well clear the context on our way out */
1143 5 : MemoryContextReset(hpam_context);
1144 :
1145 5 : RESUME_INTERRUPTS();
1146 5 : }
1147 :
1148 : /*
1149 : * Send the data to the specified parallel apply worker via shared-memory
1150 : * queue.
1151 : *
1152 : * Returns false if the attempt to send data via shared memory times out, true
1153 : * otherwise.
1154 : */
1155 : bool
1156 68906 : pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
1157 : {
1158 : int rc;
1159 : shm_mq_result result;
1160 68906 : TimestampTz startTime = 0;
1161 :
1162 : Assert(!IsTransactionState());
1163 : Assert(!winfo->serialize_changes);
1164 :
1165 : /*
1166 : * We don't try to send data to parallel worker for 'immediate' mode. This
1167 : * is primarily used for testing purposes.
1168 : */
1169 68906 : if (unlikely(debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE))
1170 4 : return false;
1171 :
1172 : /*
1173 : * This timeout is a bit arbitrary but testing revealed that it is sufficient
1174 : * to send the message unless the parallel apply worker is waiting on some
1175 : * lock or there is a serious resource crunch. See the comments atop this file
1176 : * to know why we are using a non-blocking way to send the message.
1177 : */
1178 : #define SHM_SEND_RETRY_INTERVAL_MS 1000
1179 : #define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS)
1180 :
1181 : for (;;)
1182 : {
1183 68902 : result = shm_mq_send(winfo->mq_handle, nbytes, data, true, true);
1184 :
1185 68902 : if (result == SHM_MQ_SUCCESS)
1186 68902 : return true;
1187 0 : else if (result == SHM_MQ_DETACHED)
1188 0 : ereport(ERROR,
1189 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1190 : errmsg("could not send data to shared-memory queue")));
1191 :
1192 : Assert(result == SHM_MQ_WOULD_BLOCK);
1193 :
1194 : /* Wait before retrying. */
1195 0 : rc = WaitLatch(MyLatch,
1196 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1197 : SHM_SEND_RETRY_INTERVAL_MS,
1198 : WAIT_EVENT_LOGICAL_APPLY_SEND_DATA);
1199 :
1200 0 : if (rc & WL_LATCH_SET)
1201 : {
1202 0 : ResetLatch(MyLatch);
1203 0 : CHECK_FOR_INTERRUPTS();
1204 : }
1205 :
1206 0 : if (startTime == 0)
1207 0 : startTime = GetCurrentTimestamp();
1208 0 : else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
1209 : SHM_SEND_TIMEOUT_MS))
1210 0 : return false;
1211 : }
1212 : }
1213 :
1214 : /*
1215 : * Switch to PARTIAL_SERIALIZE mode for the current transaction -- this means
1216 : * that the current data and any subsequent data for this transaction will be
1217 : * serialized to a file. This is done to prevent possible deadlocks with
1218 : * another parallel apply worker (refer to the comments atop this file).
1219 : */
1220 : void
1221 4 : pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
1222 : bool stream_locked)
1223 : {
1224 4 : ereport(LOG,
1225 : (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
1226 : winfo->shared->xid)));
1227 :
1228 : /*
1229 : * The parallel apply worker could be stuck for some reason (say waiting
1230 : * on some lock by other backend), so stop trying to send data directly to
1231 : * it and start serializing data to the file instead.
1232 : */
1233 4 : winfo->serialize_changes = true;
1234 :
1235 : /* Initialize the stream fileset. */
1236 4 : stream_start_internal(winfo->shared->xid, true);
1237 :
1238 : /*
1239 : * Acquires the stream lock if not already to make sure that the parallel
1240 : * apply worker will wait for the leader to release the stream lock until
1241 : * the end of the transaction.
1242 : */
1243 4 : if (!stream_locked)
1244 4 : pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
1245 :
1246 4 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_IN_PROGRESS);
1247 4 : }
1248 :
1249 : /*
1250 : * Wait until the parallel apply worker's transaction state has reached or
1251 : * exceeded the given xact_state.
1252 : */
1253 : static void
1254 27 : pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo,
1255 : ParallelTransState xact_state)
1256 : {
1257 : for (;;)
1258 : {
1259 : /*
1260 : * Stop if the transaction state has reached or exceeded the given
1261 : * xact_state.
1262 : */
1263 368 : if (pa_get_xact_state(winfo->shared) >= xact_state)
1264 27 : break;
1265 :
1266 : /* Wait to be signalled. */
1267 341 : (void) WaitLatch(MyLatch,
1268 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1269 : 10L,
1270 : WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);
1271 :
1272 : /* Reset the latch so we don't spin. */
1273 341 : ResetLatch(MyLatch);
1274 :
1275 : /* An interrupt may have occurred while we were waiting. */
1276 341 : CHECK_FOR_INTERRUPTS();
1277 : }
1278 27 : }
1279 :
1280 : /*
1281 : * Wait until the parallel apply worker's transaction finishes.
1282 : */
1283 : static void
1284 27 : pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
1285 : {
1286 : /*
1287 : * Wait until the parallel apply worker set the state to
1288 : * PARALLEL_TRANS_STARTED which means it has acquired the transaction
1289 : * lock. This is to prevent leader apply worker from acquiring the
1290 : * transaction lock earlier than the parallel apply worker.
1291 : */
1292 27 : pa_wait_for_xact_state(winfo, PARALLEL_TRANS_STARTED);
1293 :
1294 : /*
1295 : * Wait for the transaction lock to be released. This is required to
1296 : * detect deadlock among leader and parallel apply workers. Refer to the
1297 : * comments atop this file.
1298 : */
1299 27 : pa_lock_transaction(winfo->shared->xid, AccessShareLock);
1300 25 : pa_unlock_transaction(winfo->shared->xid, AccessShareLock);
1301 :
1302 : /*
1303 : * Check if the state becomes PARALLEL_TRANS_FINISHED in case the parallel
1304 : * apply worker failed while applying changes causing the lock to be
1305 : * released.
1306 : */
1307 25 : if (pa_get_xact_state(winfo->shared) != PARALLEL_TRANS_FINISHED)
1308 0 : ereport(ERROR,
1309 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1310 : errmsg("lost connection to the logical replication parallel apply worker")));
1311 25 : }
1312 :
1313 : /*
1314 : * Set the transaction state for a given parallel apply worker.
1315 : */
1316 : void
1317 54 : pa_set_xact_state(ParallelApplyWorkerShared *wshared,
1318 : ParallelTransState xact_state)
1319 : {
1320 54 : SpinLockAcquire(&wshared->mutex);
1321 54 : wshared->xact_state = xact_state;
1322 54 : SpinLockRelease(&wshared->mutex);
1323 54 : }
1324 :
1325 : /*
1326 : * Get the transaction state for a given parallel apply worker.
1327 : */
1328 : static ParallelTransState
1329 393 : pa_get_xact_state(ParallelApplyWorkerShared *wshared)
1330 : {
1331 : ParallelTransState xact_state;
1332 :
1333 393 : SpinLockAcquire(&wshared->mutex);
1334 393 : xact_state = wshared->xact_state;
1335 393 : SpinLockRelease(&wshared->mutex);
1336 :
1337 393 : return xact_state;
1338 : }
1339 :
1340 : /*
1341 : * Cache the parallel apply worker information.
1342 : */
1343 : void
1344 510 : pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
1345 : {
1346 510 : stream_apply_worker = winfo;
1347 510 : }
1348 :
1349 : /*
1350 : * Form a unique savepoint name for the streaming transaction.
1351 : *
1352 : * Note that different subscriptions for publications on different nodes can
1353 : * receive same remote xid, so we need to use subscription id along with it.
1354 : *
1355 : * Returns the name in the supplied buffer.
1356 : */
1357 : static void
1358 27 : pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
1359 : {
1360 27 : snprintf(spname, szsp, "pg_sp_%u_%u", suboid, xid);
1361 27 : }
1362 :
1363 : /*
1364 : * Define a savepoint for a subxact in parallel apply worker if needed.
1365 : *
1366 : * The parallel apply worker can figure out if a new subtransaction was
1367 : * started by checking if the new change arrived with a different xid. In that
1368 : * case define a named savepoint, so that we are able to rollback to it
1369 : * if required.
1370 : */
1371 : void
1372 68584 : pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
1373 : {
1374 68584 : if (current_xid != top_xid &&
1375 52 : !list_member_xid(subxactlist, current_xid))
1376 : {
1377 : MemoryContext oldctx;
1378 : char spname[NAMEDATALEN];
1379 :
1380 17 : pa_savepoint_name(MySubscription->oid, current_xid,
1381 : spname, sizeof(spname));
1382 :
1383 17 : elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);
1384 :
1385 : /* We must be in transaction block to define the SAVEPOINT. */
1386 17 : if (!IsTransactionBlock())
1387 : {
1388 5 : if (!IsTransactionState())
1389 0 : StartTransactionCommand();
1390 :
1391 5 : BeginTransactionBlock();
1392 5 : CommitTransactionCommand();
1393 : }
1394 :
1395 17 : DefineSavepoint(spname);
1396 :
1397 : /*
1398 : * CommitTransactionCommand is needed to start a subtransaction after
1399 : * issuing a SAVEPOINT inside a transaction block (see
1400 : * StartSubTransaction()).
1401 : */
1402 17 : CommitTransactionCommand();
1403 :
1404 17 : oldctx = MemoryContextSwitchTo(TopTransactionContext);
1405 17 : subxactlist = lappend_xid(subxactlist, current_xid);
1406 17 : MemoryContextSwitchTo(oldctx);
1407 : }
1408 68584 : }
1409 :
1410 : /* Reset the list that maintains subtransactions. */
1411 : void
1412 25 : pa_reset_subtrans(void)
1413 : {
1414 : /*
1415 : * We don't need to free this explicitly as the allocated memory will be
1416 : * freed at the transaction end.
1417 : */
1418 25 : subxactlist = NIL;
1419 25 : }
1420 :
1421 : /*
1422 : * Handle STREAM ABORT message when the transaction was applied in a parallel
1423 : * apply worker.
1424 : */
1425 : void
1426 12 : pa_stream_abort(LogicalRepStreamAbortData *abort_data)
1427 : {
1428 12 : TransactionId xid = abort_data->xid;
1429 12 : TransactionId subxid = abort_data->subxid;
1430 :
1431 : /*
1432 : * Update origin state so we can restart streaming from correct position
1433 : * in case of crash.
1434 : */
1435 12 : replorigin_xact_state.origin_lsn = abort_data->abort_lsn;
1436 12 : replorigin_xact_state.origin_timestamp = abort_data->abort_time;
1437 :
1438 : /*
1439 : * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1440 : * just free the subxactlist.
1441 : */
1442 12 : if (subxid == xid)
1443 : {
1444 2 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
1445 :
1446 : /*
1447 : * Release the lock as we might be processing an empty streaming
1448 : * transaction in which case the lock won't be released during
1449 : * transaction rollback.
1450 : *
1451 : * Note that it's ok to release the transaction lock before aborting
1452 : * the transaction because even if the parallel apply worker dies due
1453 : * to crash or some other reason, such a transaction would still be
1454 : * considered aborted.
1455 : */
1456 2 : pa_unlock_transaction(xid, AccessExclusiveLock);
1457 :
1458 2 : AbortCurrentTransaction();
1459 :
1460 2 : if (IsTransactionBlock())
1461 : {
1462 1 : EndTransactionBlock(false);
1463 1 : CommitTransactionCommand();
1464 : }
1465 :
1466 2 : pa_reset_subtrans();
1467 :
1468 2 : pgstat_report_activity(STATE_IDLE, NULL);
1469 : }
1470 : else
1471 : {
1472 : /* OK, so it's a subxact. Rollback to the savepoint. */
1473 : int i;
1474 : char spname[NAMEDATALEN];
1475 :
1476 10 : pa_savepoint_name(MySubscription->oid, subxid, spname, sizeof(spname));
1477 :
1478 10 : elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);
1479 :
1480 : /*
1481 : * Search the subxactlist, determine the offset tracked for the
1482 : * subxact, and truncate the list.
1483 : *
1484 : * Note that for an empty sub-transaction we won't find the subxid
1485 : * here.
1486 : */
1487 12 : for (i = list_length(subxactlist) - 1; i >= 0; i--)
1488 : {
1489 11 : TransactionId xid_tmp = lfirst_xid(list_nth_cell(subxactlist, i));
1490 :
1491 11 : if (xid_tmp == subxid)
1492 : {
1493 9 : RollbackToSavepoint(spname);
1494 9 : CommitTransactionCommand();
1495 9 : subxactlist = list_truncate(subxactlist, i);
1496 9 : break;
1497 : }
1498 : }
1499 : }
1500 12 : }
1501 :
1502 : /*
1503 : * Set the fileset state for a particular parallel apply worker. The fileset
1504 : * will be set once the leader worker serialized all changes to the file
1505 : * so that it can be used by parallel apply worker.
1506 : */
1507 : void
1508 16 : pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
1509 : PartialFileSetState fileset_state)
1510 : {
1511 16 : SpinLockAcquire(&wshared->mutex);
1512 16 : wshared->fileset_state = fileset_state;
1513 :
1514 16 : if (fileset_state == FS_SERIALIZE_DONE)
1515 : {
1516 : Assert(am_leader_apply_worker());
1517 : Assert(MyLogicalRepWorker->stream_fileset);
1518 4 : wshared->fileset = *MyLogicalRepWorker->stream_fileset;
1519 : }
1520 :
1521 16 : SpinLockRelease(&wshared->mutex);
1522 16 : }
1523 :
1524 : /*
1525 : * Get the fileset state for the current parallel apply worker.
1526 : */
1527 : static PartialFileSetState
1528 72 : pa_get_fileset_state(void)
1529 : {
1530 : PartialFileSetState fileset_state;
1531 :
1532 : Assert(am_parallel_apply_worker());
1533 :
1534 72 : SpinLockAcquire(&MyParallelShared->mutex);
1535 72 : fileset_state = MyParallelShared->fileset_state;
1536 72 : SpinLockRelease(&MyParallelShared->mutex);
1537 :
1538 72 : return fileset_state;
1539 : }
1540 :
1541 : /*
1542 : * Helper functions to acquire and release a lock for each stream block.
1543 : *
1544 : * Set locktag_field4 to PARALLEL_APPLY_LOCK_STREAM to indicate that it's a
1545 : * stream lock.
1546 : *
1547 : * Refer to the comments atop this file to see how the stream lock is used.
1548 : */
1549 : void
1550 279 : pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
1551 : {
1552 279 : LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
1553 : PARALLEL_APPLY_LOCK_STREAM, lockmode);
1554 277 : }
1555 :
1556 : void
1557 275 : pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
1558 : {
1559 275 : UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
1560 : PARALLEL_APPLY_LOCK_STREAM, lockmode);
1561 275 : }
1562 :
1563 : /*
1564 : * Helper functions to acquire and release a lock for each local transaction
1565 : * apply.
1566 : *
1567 : * Set locktag_field4 to PARALLEL_APPLY_LOCK_XACT to indicate that it's a
1568 : * transaction lock.
1569 : *
1570 : * Note that all the callers must pass a remote transaction ID instead of a
1571 : * local transaction ID as xid. This is because the local transaction ID will
1572 : * only be assigned while applying the first change in the parallel apply but
1573 : * it's possible that the first change in the parallel apply worker is blocked
1574 : * by a concurrently executing transaction in another parallel apply worker. We
1575 : * can only communicate the local transaction id to the leader after applying
1576 : * the first change so it won't be able to wait after sending the xact finish
1577 : * command using this lock.
1578 : *
1579 : * Refer to the comments atop this file to see how the transaction lock is
1580 : * used.
1581 : */
1582 : void
1583 56 : pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
1584 : {
1585 56 : LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
1586 : PARALLEL_APPLY_LOCK_XACT, lockmode);
1587 54 : }
1588 :
1589 : void
1590 50 : pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
1591 : {
1592 50 : UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
1593 : PARALLEL_APPLY_LOCK_XACT, lockmode);
1594 50 : }
1595 :
1596 : /*
1597 : * Decrement the number of pending streaming blocks and wait on the stream lock
1598 : * if there is no pending block available.
1599 : */
1600 : void
1601 254 : pa_decr_and_wait_stream_block(void)
1602 : {
1603 : Assert(am_parallel_apply_worker());
1604 :
1605 : /*
1606 : * It is only possible to not have any pending stream chunks when we are
1607 : * applying spooled messages.
1608 : */
1609 254 : if (pg_atomic_read_u32(&MyParallelShared->pending_stream_count) == 0)
1610 : {
1611 16 : if (pa_has_spooled_message_pending())
1612 16 : return;
1613 :
1614 0 : elog(ERROR, "invalid pending streaming chunk 0");
1615 : }
1616 :
1617 238 : if (pg_atomic_sub_fetch_u32(&MyParallelShared->pending_stream_count, 1) == 0)
1618 : {
1619 26 : pa_lock_stream(MyParallelShared->xid, AccessShareLock);
1620 24 : pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
1621 : }
1622 : }
1623 :
1624 : /*
1625 : * Finish processing the streaming transaction in the leader apply worker.
1626 : */
1627 : void
1628 27 : pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
1629 : {
1630 : Assert(am_leader_apply_worker());
1631 :
1632 : /*
1633 : * Unlock the shared object lock so that parallel apply worker can
1634 : * continue to receive and apply changes.
1635 : */
1636 27 : pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
1637 :
1638 : /*
1639 : * Wait for that worker to finish. This is necessary to maintain commit
1640 : * order which avoids failures due to transaction dependencies and
1641 : * deadlocks.
1642 : */
1643 27 : pa_wait_for_xact_finish(winfo);
1644 :
1645 25 : if (XLogRecPtrIsValid(remote_lsn))
1646 23 : store_flush_position(remote_lsn, winfo->shared->last_commit_end);
1647 :
1648 25 : pa_free_worker(winfo);
1649 25 : }
|