LCOV - code coverage report
Current view: top level - src/backend/replication/logical - applyparallelworker.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 368 411 89.5 %
Date: 2025-10-10 10:17:52 Functions: 36 36 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * applyparallelworker.c
       3             :  *     Support routines for applying xact by parallel apply worker
       4             :  *
       5             :  * Copyright (c) 2023-2025, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/applyparallelworker.c
       9             :  *
      10             :  * This file contains the code to launch, set up, and teardown a parallel apply
      11             :  * worker which receives the changes from the leader worker and invokes routines
      12             :  * to apply those on the subscriber database. Additionally, this file contains
      13             :  * routines that are intended to support setting up, using, and tearing down a
      14             :  * ParallelApplyWorkerInfo which is required so the leader worker and parallel
      15             :  * apply workers can communicate with each other.
      16             :  *
      17             :  * The parallel apply workers are assigned (if available) as soon as xact's
      18             :  * first stream is received for subscriptions that have set their 'streaming'
      19             :  * option as parallel. The leader apply worker will send changes to this new
      20             :  * worker via shared memory. We keep this worker assigned till the transaction
      21             :  * commit is received and also wait for the worker to finish at commit. This
      22             :  * preserves commit ordering and avoid file I/O in most cases, although we
      23             :  * still need to spill to a file if there is no worker available. See comments
      24             :  * atop logical/worker to know more about streamed xacts whose changes are
      25             :  * spilled to disk. It is important to maintain commit order to avoid failures
      26             :  * due to: (a) transaction dependencies - say if we insert a row in the first
      27             :  * transaction and update it in the second transaction on publisher then
      28             :  * allowing the subscriber to apply both in parallel can lead to failure in the
      29             :  * update; (b) deadlocks - allowing transactions that update the same set of
      30             :  * rows/tables in the opposite order to be applied in parallel can lead to
      31             :  * deadlocks.
      32             :  *
      33             :  * A worker pool is used to avoid restarting workers for each streaming
      34             :  * transaction. We maintain each worker's information (ParallelApplyWorkerInfo)
      35             :  * in the ParallelApplyWorkerPool. After successfully launching a new worker,
      36             :  * its information is added to the ParallelApplyWorkerPool. Once the worker
      37             :  * finishes applying the transaction, it is marked as available for re-use.
      38             :  * Now, before starting a new worker to apply the streaming transaction, we
      39             :  * check the list for any available worker. Note that we retain a maximum of
      40             :  * half the max_parallel_apply_workers_per_subscription workers in the pool and
      41             :  * after that, we simply exit the worker after applying the transaction.
      42             :  *
      43             :  * XXX This worker pool threshold is arbitrary and we can provide a GUC
      44             :  * variable for this in the future if required.
      45             :  *
      46             :  * The leader apply worker will create a separate dynamic shared memory segment
      47             :  * when each parallel apply worker starts. The reason for this design is that
      48             :  * we cannot predict how many workers will be needed. It may be possible to
      49             :  * allocate enough shared memory in one segment based on the maximum number of
      50             :  * parallel apply workers (max_parallel_apply_workers_per_subscription), but
      51             :  * this would waste memory if no process is actually started.
      52             :  *
      53             :  * The dynamic shared memory segment contains: (a) a shm_mq that is used to
      54             :  * send changes in the transaction from leader apply worker to parallel apply
      55             :  * worker; (b) another shm_mq that is used to send errors (and other messages
      56             :  * reported via elog/ereport) from the parallel apply worker to leader apply
      57             :  * worker; (c) necessary information to be shared among parallel apply workers
      58             :  * and the leader apply worker (i.e. members of ParallelApplyWorkerShared).
      59             :  *
      60             :  * Locking Considerations
      61             :  * ----------------------
      62             :  * We have a risk of deadlock due to concurrently applying the transactions in
      63             :  * parallel mode that were independent on the publisher side but became
      64             :  * dependent on the subscriber side due to the different database structures
      65             :  * (like schema of subscription tables, constraints, etc.) on each side. This
      66             :  * can happen even without parallel mode when there are concurrent operations
      67             :  * on the subscriber. In order to detect the deadlocks among leader (LA) and
      68             :  * parallel apply (PA) workers, we used lmgr locks when the PA waits for the
      69             :  * next stream (set of changes) and LA waits for PA to finish the transaction.
      70             :  * An alternative approach could be to not allow parallelism when the schema of
      71             :  * tables is different between the publisher and subscriber but that would be
      72             :  * too restrictive and would require the publisher to send much more
      73             :  * information than it is currently sending.
      74             :  *
      75             :  * Consider a case where the subscribed table does not have a unique key on the
      76             :  * publisher and has a unique key on the subscriber. The deadlock can happen in
      77             :  * the following ways:
      78             :  *
      79             :  * 1) Deadlock between the leader apply worker and a parallel apply worker
      80             :  *
      81             :  * Consider that the parallel apply worker (PA) is executing TX-1 and the
      82             :  * leader apply worker (LA) is executing TX-2 concurrently on the subscriber.
      83             :  * Now, LA is waiting for PA because of the unique key constraint of the
      84             :  * subscribed table while PA is waiting for LA to send the next stream of
      85             :  * changes or transaction finish command message.
      86             :  *
      87             :  * In order for lmgr to detect this, we have LA acquire a session lock on the
      88             :  * remote transaction (by pa_lock_stream()) and have PA wait on the lock before
      89             :  * trying to receive the next stream of changes. Specifically, LA will acquire
      90             :  * the lock in AccessExclusive mode before sending the STREAM_STOP and will
      91             :  * release it if already acquired after sending the STREAM_START, STREAM_ABORT
      92             :  * (for toplevel transaction), STREAM_PREPARE, and STREAM_COMMIT. The PA will
      93             :  * acquire the lock in AccessShare mode after processing STREAM_STOP and
      94             :  * STREAM_ABORT (for subtransaction) and then release the lock immediately
      95             :  * after acquiring it.
      96             :  *
      97             :  * The lock graph for the above example will look as follows:
      98             :  * LA (waiting to acquire the lock on the unique index) -> PA (waiting to
      99             :  * acquire the stream lock) -> LA
     100             :  *
     101             :  * This way, when PA is waiting for LA for the next stream of changes, we can
     102             :  * have a wait-edge from PA to LA in lmgr, which will make us detect the
     103             :  * deadlock between LA and PA.
     104             :  *
     105             :  * 2) Deadlock between the leader apply worker and parallel apply workers
     106             :  *
     107             :  * This scenario is similar to the first case but TX-1 and TX-2 are executed by
     108             :  * two parallel apply workers (PA-1 and PA-2 respectively). In this scenario,
     109             :  * PA-2 is waiting for PA-1 to complete its transaction while PA-1 is waiting
     110             :  * for subsequent input from LA. Also, LA is waiting for PA-2 to complete its
     111             :  * transaction in order to preserve the commit order. There is a deadlock among
     112             :  * the three processes.
     113             :  *
     114             :  * In order for lmgr to detect this, we have PA acquire a session lock (this is
     115             :  * a different lock than referred in the previous case, see
     116             :  * pa_lock_transaction()) on the transaction being applied and have LA wait on
     117             :  * the lock before proceeding in the transaction finish commands. Specifically,
     118             :  * PA will acquire this lock in AccessExclusive mode before executing the first
     119             :  * message of the transaction and release it at the xact end. LA will acquire
     120             :  * this lock in AccessShare mode at transaction finish commands and release it
     121             :  * immediately.
     122             :  *
     123             :  * The lock graph for the above example will look as follows:
     124             :  * LA (waiting to acquire the transaction lock) -> PA-2 (waiting to acquire the
     125             :  * lock due to unique index constraint) -> PA-1 (waiting to acquire the stream
     126             :  * lock) -> LA
     127             :  *
     128             :  * This way when LA is waiting to finish the transaction end command to preserve
     129             :  * the commit order, we will be able to detect deadlock, if any.
     130             :  *
     131             :  * One might think we can use XactLockTableWait(), but XactLockTableWait()
     132             :  * considers PREPARED TRANSACTION as still in progress which means the lock
     133             :  * won't be released even after the parallel apply worker has prepared the
     134             :  * transaction.
     135             :  *
     136             :  * 3) Deadlock when the shm_mq buffer is full
     137             :  *
     138             :  * In the previous scenario (ie. PA-1 and PA-2 are executing transactions
     139             :  * concurrently), if the shm_mq buffer between LA and PA-2 is full, LA has to
     140             :  * wait to send messages, and this wait doesn't appear in lmgr.
     141             :  *
     142             :  * To avoid this wait, we use a non-blocking write and wait with a timeout. If
     143             :  * the timeout is exceeded, the LA will serialize all the pending messages to
     144             :  * a file and indicate PA-2 that it needs to read that file for the remaining
     145             :  * messages. Then LA will start waiting for commit as in the previous case
     146             :  * which will detect deadlock if any. See pa_send_data() and
     147             :  * enum TransApplyAction.
     148             :  *
     149             :  * Lock types
     150             :  * ----------
     151             :  * Both the stream lock and the transaction lock mentioned above are
     152             :  * session-level locks because both locks could be acquired outside the
     153             :  * transaction, and the stream lock in the leader needs to persist across
     154             :  * transaction boundaries i.e. until the end of the streaming transaction.
     155             :  *-------------------------------------------------------------------------
     156             :  */
     157             : 
     158             : #include "postgres.h"
     159             : 
     160             : #include "libpq/pqformat.h"
     161             : #include "libpq/pqmq.h"
     162             : #include "pgstat.h"
     163             : #include "postmaster/interrupt.h"
     164             : #include "replication/logicallauncher.h"
     165             : #include "replication/logicalworker.h"
     166             : #include "replication/origin.h"
     167             : #include "replication/worker_internal.h"
     168             : #include "storage/ipc.h"
     169             : #include "storage/lmgr.h"
     170             : #include "tcop/tcopprot.h"
     171             : #include "utils/inval.h"
     172             : #include "utils/memutils.h"
     173             : #include "utils/syscache.h"
     174             : 
     175             : #define PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067
     176             : 
     177             : /*
     178             :  * DSM keys for parallel apply worker. Unlike other parallel execution code,
     179             :  * since we don't need to worry about DSM keys conflicting with plan_node_id we
     180             :  * can use small integers.
     181             :  */
     182             : #define PARALLEL_APPLY_KEY_SHARED       1
     183             : #define PARALLEL_APPLY_KEY_MQ           2
     184             : #define PARALLEL_APPLY_KEY_ERROR_QUEUE  3
     185             : 
     186             : /* Queue size of DSM, 16 MB for now. */
     187             : #define DSM_QUEUE_SIZE  (16 * 1024 * 1024)
     188             : 
     189             : /*
     190             :  * Error queue size of DSM. It is desirable to make it large enough that a
     191             :  * typical ErrorResponse can be sent without blocking. That way, a worker that
     192             :  * errors out can write the whole message into the queue and terminate without
     193             :  * waiting for the user backend.
     194             :  */
     195             : #define DSM_ERROR_QUEUE_SIZE            (16 * 1024)
     196             : 
     197             : /*
     198             :  * There are three fields in each message received by the parallel apply
     199             :  * worker: start_lsn, end_lsn and send_time. Because we have updated these
     200             :  * statistics in the leader apply worker, we can ignore these fields in the
     201             :  * parallel apply worker (see function LogicalRepApplyLoop).
     202             :  */
     203             : #define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
     204             : 
     205             : /*
     206             :  * The type of session-level lock on a transaction being applied on a logical
     207             :  * replication subscriber.
     208             :  */
     209             : #define PARALLEL_APPLY_LOCK_STREAM  0
     210             : #define PARALLEL_APPLY_LOCK_XACT    1
     211             : 
     212             : /*
     213             :  * Hash table entry to map xid to the parallel apply worker state.
     214             :  */
     215             : typedef struct ParallelApplyWorkerEntry
     216             : {
     217             :     TransactionId xid;          /* Hash key -- must be first */
     218             :     ParallelApplyWorkerInfo *winfo;
     219             : } ParallelApplyWorkerEntry;
     220             : 
     221             : /*
     222             :  * A hash table used to cache the state of streaming transactions being applied
     223             :  * by the parallel apply workers.
     224             :  */
     225             : static HTAB *ParallelApplyTxnHash = NULL;
     226             : 
     227             : /*
     228             : * A list (pool) of active parallel apply workers. The information for
     229             : * the new worker is added to the list after successfully launching it. The
     230             : * list entry is removed if there are already enough workers in the worker
     231             : * pool at the end of the transaction. For more information about the worker
     232             : * pool, see comments atop this file.
     233             :  */
     234             : static List *ParallelApplyWorkerPool = NIL;
     235             : 
     236             : /*
     237             :  * Information shared between leader apply worker and parallel apply worker.
     238             :  */
     239             : ParallelApplyWorkerShared *MyParallelShared = NULL;
     240             : 
     241             : /*
     242             :  * Is there a message sent by a parallel apply worker that the leader apply
     243             :  * worker needs to receive?
     244             :  */
     245             : volatile sig_atomic_t ParallelApplyMessagePending = false;
     246             : 
     247             : /*
     248             :  * Cache the parallel apply worker information required for applying the
     249             :  * current streaming transaction. It is used to save the cost of searching the
     250             :  * hash table when applying the changes between STREAM_START and STREAM_STOP.
     251             :  */
     252             : static ParallelApplyWorkerInfo *stream_apply_worker = NULL;
     253             : 
     254             : /* A list to maintain subtransactions, if any. */
     255             : static List *subxactlist = NIL;
     256             : 
     257             : static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo);
     258             : static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared);
     259             : static PartialFileSetState pa_get_fileset_state(void);
     260             : 
     261             : /*
     262             :  * Returns true if it is OK to start a parallel apply worker, false otherwise.
     263             :  */
     264             : static bool
     265         164 : pa_can_start(void)
     266             : {
     267             :     /* Only leader apply workers can start parallel apply workers. */
     268         164 :     if (!am_leader_apply_worker())
     269          54 :         return false;
     270             : 
     271             :     /*
     272             :      * It is good to check for any change in the subscription parameter to
     273             :      * avoid the case where for a very long time the change doesn't get
     274             :      * reflected. This can happen when there is a constant flow of streaming
     275             :      * transactions that are handled by parallel apply workers.
     276             :      *
     277             :      * It is better to do it before the below checks so that the latest values
     278             :      * of subscription can be used for the checks.
     279             :      */
     280         110 :     maybe_reread_subscription();
     281             : 
     282             :     /*
     283             :      * Don't start a new parallel apply worker if the subscription is not
     284             :      * using parallel streaming mode, or if the publisher does not support
     285             :      * parallel apply.
     286             :      */
     287         110 :     if (!MyLogicalRepWorker->parallel_apply)
     288          56 :         return false;
     289             : 
     290             :     /*
     291             :      * Don't start a new parallel worker if user has set skiplsn as it's
     292             :      * possible that they want to skip the streaming transaction. For
     293             :      * streaming transactions, we need to serialize the transaction to a file
     294             :      * so that we can get the last LSN of the transaction to judge whether to
     295             :      * skip before starting to apply the change.
     296             :      *
     297             :      * One might think that we could allow parallelism if the first lsn of the
     298             :      * transaction is greater than skiplsn, but we don't send it with the
     299             :      * STREAM START message, and it doesn't seem worth sending the extra eight
     300             :      * bytes with the STREAM START to enable parallelism for this case.
     301             :      */
     302          54 :     if (!XLogRecPtrIsInvalid(MySubscription->skiplsn))
     303           0 :         return false;
     304             : 
     305             :     /*
     306             :      * For streaming transactions that are being applied using a parallel
     307             :      * apply worker, we cannot decide whether to apply the change for a
     308             :      * relation that is not in the READY state (see
     309             :      * should_apply_changes_for_rel) as we won't know remote_final_lsn by that
     310             :      * time. So, we don't start the new parallel apply worker in this case.
     311             :      */
     312          54 :     if (!AllTablesyncsReady())
     313           0 :         return false;
     314             : 
     315          54 :     return true;
     316             : }
     317             : 
     318             : /*
     319             :  * Set up a dynamic shared memory segment.
     320             :  *
     321             :  * We set up a control region that contains a fixed-size worker info
     322             :  * (ParallelApplyWorkerShared), a message queue, and an error queue.
     323             :  *
     324             :  * Returns true on success, false on failure.
     325             :  */
     326             : static bool
     327          20 : pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
     328             : {
     329             :     shm_toc_estimator e;
     330             :     Size        segsize;
     331             :     dsm_segment *seg;
     332             :     shm_toc    *toc;
     333             :     ParallelApplyWorkerShared *shared;
     334             :     shm_mq     *mq;
     335          20 :     Size        queue_size = DSM_QUEUE_SIZE;
     336          20 :     Size        error_queue_size = DSM_ERROR_QUEUE_SIZE;
     337             : 
     338             :     /*
     339             :      * Estimate how much shared memory we need.
     340             :      *
     341             :      * Because the TOC machinery may choose to insert padding of oddly-sized
     342             :      * requests, we must estimate each chunk separately.
     343             :      *
     344             :      * We need one key to register the location of the header, and two other
     345             :      * keys to track the locations of the message queue and the error message
     346             :      * queue.
     347             :      */
     348          20 :     shm_toc_initialize_estimator(&e);
     349          20 :     shm_toc_estimate_chunk(&e, sizeof(ParallelApplyWorkerShared));
     350          20 :     shm_toc_estimate_chunk(&e, queue_size);
     351          20 :     shm_toc_estimate_chunk(&e, error_queue_size);
     352             : 
     353          20 :     shm_toc_estimate_keys(&e, 3);
     354          20 :     segsize = shm_toc_estimate(&e);
     355             : 
     356             :     /* Create the shared memory segment and establish a table of contents. */
     357          20 :     seg = dsm_create(shm_toc_estimate(&e), 0);
     358          20 :     if (!seg)
     359           0 :         return false;
     360             : 
     361          20 :     toc = shm_toc_create(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg),
     362             :                          segsize);
     363             : 
     364             :     /* Set up the header region. */
     365          20 :     shared = shm_toc_allocate(toc, sizeof(ParallelApplyWorkerShared));
     366          20 :     SpinLockInit(&shared->mutex);
     367             : 
     368          20 :     shared->xact_state = PARALLEL_TRANS_UNKNOWN;
     369          20 :     pg_atomic_init_u32(&(shared->pending_stream_count), 0);
     370          20 :     shared->last_commit_end = InvalidXLogRecPtr;
     371          20 :     shared->fileset_state = FS_EMPTY;
     372             : 
     373          20 :     shm_toc_insert(toc, PARALLEL_APPLY_KEY_SHARED, shared);
     374             : 
     375             :     /* Set up message queue for the worker. */
     376          20 :     mq = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
     377          20 :     shm_toc_insert(toc, PARALLEL_APPLY_KEY_MQ, mq);
     378          20 :     shm_mq_set_sender(mq, MyProc);
     379             : 
     380             :     /* Attach the queue. */
     381          20 :     winfo->mq_handle = shm_mq_attach(mq, seg, NULL);
     382             : 
     383             :     /* Set up error queue for the worker. */
     384          20 :     mq = shm_mq_create(shm_toc_allocate(toc, error_queue_size),
     385             :                        error_queue_size);
     386          20 :     shm_toc_insert(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, mq);
     387          20 :     shm_mq_set_receiver(mq, MyProc);
     388             : 
     389             :     /* Attach the queue. */
     390          20 :     winfo->error_mq_handle = shm_mq_attach(mq, seg, NULL);
     391             : 
     392             :     /* Return results to caller. */
     393          20 :     winfo->dsm_seg = seg;
     394          20 :     winfo->shared = shared;
     395             : 
     396          20 :     return true;
     397             : }
     398             : 
     399             : /*
     400             :  * Try to get a parallel apply worker from the pool. If none is available then
     401             :  * start a new one.
     402             :  */
     403             : static ParallelApplyWorkerInfo *
     404          54 : pa_launch_parallel_worker(void)
     405             : {
     406             :     MemoryContext oldcontext;
     407             :     bool        launched;
     408             :     ParallelApplyWorkerInfo *winfo;
     409             :     ListCell   *lc;
     410             : 
     411             :     /* Try to get an available parallel apply worker from the worker pool. */
     412          58 :     foreach(lc, ParallelApplyWorkerPool)
     413             :     {
     414          38 :         winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
     415             : 
     416          38 :         if (!winfo->in_use)
     417          34 :             return winfo;
     418             :     }
     419             : 
     420             :     /*
     421             :      * Start a new parallel apply worker.
     422             :      *
     423             :      * The worker info can be used for the lifetime of the worker process, so
     424             :      * create it in a permanent context.
     425             :      */
     426          20 :     oldcontext = MemoryContextSwitchTo(ApplyContext);
     427             : 
     428          20 :     winfo = (ParallelApplyWorkerInfo *) palloc0(sizeof(ParallelApplyWorkerInfo));
     429             : 
     430             :     /* Setup shared memory. */
     431          20 :     if (!pa_setup_dsm(winfo))
     432             :     {
     433           0 :         MemoryContextSwitchTo(oldcontext);
     434           0 :         pfree(winfo);
     435           0 :         return NULL;
     436             :     }
     437             : 
     438          20 :     launched = logicalrep_worker_launch(WORKERTYPE_PARALLEL_APPLY,
     439          20 :                                         MyLogicalRepWorker->dbid,
     440          20 :                                         MySubscription->oid,
     441          20 :                                         MySubscription->name,
     442          20 :                                         MyLogicalRepWorker->userid,
     443             :                                         InvalidOid,
     444             :                                         dsm_segment_handle(winfo->dsm_seg),
     445             :                                         false);
     446             : 
     447          20 :     if (launched)
     448             :     {
     449          20 :         ParallelApplyWorkerPool = lappend(ParallelApplyWorkerPool, winfo);
     450             :     }
     451             :     else
     452             :     {
     453           0 :         pa_free_worker_info(winfo);
     454           0 :         winfo = NULL;
     455             :     }
     456             : 
     457          20 :     MemoryContextSwitchTo(oldcontext);
     458             : 
     459          20 :     return winfo;
     460             : }
     461             : 
     462             : /*
     463             :  * Allocate a parallel apply worker that will be used for the specified xid.
     464             :  *
     465             :  * We first try to get an available worker from the pool, if any and then try
     466             :  * to launch a new worker. On successful allocation, remember the worker
     467             :  * information in the hash table so that we can get it later for processing the
     468             :  * streaming changes.
     469             :  */
     470             : void
     471         164 : pa_allocate_worker(TransactionId xid)
     472             : {
     473             :     bool        found;
     474         164 :     ParallelApplyWorkerInfo *winfo = NULL;
     475             :     ParallelApplyWorkerEntry *entry;
     476             : 
     477         164 :     if (!pa_can_start())
     478         110 :         return;
     479             : 
     480          54 :     winfo = pa_launch_parallel_worker();
     481          54 :     if (!winfo)
     482           0 :         return;
     483             : 
     484             :     /* First time through, initialize parallel apply worker state hashtable. */
     485          54 :     if (!ParallelApplyTxnHash)
     486             :     {
     487             :         HASHCTL     ctl;
     488             : 
     489         182 :         MemSet(&ctl, 0, sizeof(ctl));
     490          14 :         ctl.keysize = sizeof(TransactionId);
     491          14 :         ctl.entrysize = sizeof(ParallelApplyWorkerEntry);
     492          14 :         ctl.hcxt = ApplyContext;
     493             : 
     494          14 :         ParallelApplyTxnHash = hash_create("logical replication parallel apply workers hash",
     495             :                                            16, &ctl,
     496             :                                            HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
     497             :     }
     498             : 
     499             :     /* Create an entry for the requested transaction. */
     500          54 :     entry = hash_search(ParallelApplyTxnHash, &xid, HASH_ENTER, &found);
     501          54 :     if (found)
     502           0 :         elog(ERROR, "hash table corrupted");
     503             : 
     504             :     /* Update the transaction information in shared memory. */
     505          54 :     SpinLockAcquire(&winfo->shared->mutex);
     506          54 :     winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN;
     507          54 :     winfo->shared->xid = xid;
     508          54 :     SpinLockRelease(&winfo->shared->mutex);
     509             : 
     510          54 :     winfo->in_use = true;
     511          54 :     winfo->serialize_changes = false;
     512          54 :     entry->winfo = winfo;
     513             : }
     514             : 
     515             : /*
     516             :  * Find the assigned worker for the given transaction, if any.
     517             :  */
     518             : ParallelApplyWorkerInfo *
     519      514632 : pa_find_worker(TransactionId xid)
     520             : {
     521             :     bool        found;
     522             :     ParallelApplyWorkerEntry *entry;
     523             : 
     524      514632 :     if (!TransactionIdIsValid(xid))
     525      160286 :         return NULL;
     526             : 
     527      354346 :     if (!ParallelApplyTxnHash)
     528      206480 :         return NULL;
     529             : 
     530             :     /* Return the cached parallel apply worker if valid. */
     531      147866 :     if (stream_apply_worker)
     532      147290 :         return stream_apply_worker;
     533             : 
     534             :     /* Find an entry for the requested transaction. */
     535         576 :     entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
     536         576 :     if (found)
     537             :     {
     538             :         /* The worker must not have exited.  */
     539             :         Assert(entry->winfo->in_use);
     540         576 :         return entry->winfo;
     541             :     }
     542             : 
     543           0 :     return NULL;
     544             : }
     545             : 
     546             : /*
     547             :  * Makes the worker available for reuse.
     548             :  *
     549             :  * This removes the parallel apply worker entry from the hash table so that it
     550             :  * can't be used. If there are enough workers in the pool, it stops the worker
     551             :  * and frees the corresponding info. Otherwise it just marks the worker as
     552             :  * available for reuse.
     553             :  *
     554             :  * For more information about the worker pool, see comments atop this file.
     555             :  */
     556             : static void
     557          48 : pa_free_worker(ParallelApplyWorkerInfo *winfo)
     558             : {
     559             :     Assert(!am_parallel_apply_worker());
     560             :     Assert(winfo->in_use);
     561             :     Assert(pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED);
     562             : 
     563          48 :     if (!hash_search(ParallelApplyTxnHash, &winfo->shared->xid, HASH_REMOVE, NULL))
     564           0 :         elog(ERROR, "hash table corrupted");
     565             : 
     566             :     /*
     567             :      * Stop the worker if there are enough workers in the pool.
     568             :      *
     569             :      * XXX Additionally, we also stop the worker if the leader apply worker
     570             :      * serialize part of the transaction data due to a send timeout. This is
     571             :      * because the message could be partially written to the queue and there
     572             :      * is no way to clean the queue other than resending the message until it
     573             :      * succeeds. Instead of trying to send the data which anyway would have
     574             :      * been serialized and then letting the parallel apply worker deal with
     575             :      * the spurious message, we stop the worker.
     576             :      */
     577          48 :     if (winfo->serialize_changes ||
     578          40 :         list_length(ParallelApplyWorkerPool) >
     579          40 :         (max_parallel_apply_workers_per_subscription / 2))
     580             :     {
     581          10 :         logicalrep_pa_worker_stop(winfo);
     582          10 :         pa_free_worker_info(winfo);
     583             : 
     584          10 :         return;
     585             :     }
     586             : 
     587          38 :     winfo->in_use = false;
     588          38 :     winfo->serialize_changes = false;
     589             : }
     590             : 
     591             : /*
     592             :  * Free the parallel apply worker information and unlink the files with
     593             :  * serialized changes if any.
     594             :  */
     595             : static void
     596          10 : pa_free_worker_info(ParallelApplyWorkerInfo *winfo)
     597             : {
     598             :     Assert(winfo);
     599             : 
     600          10 :     if (winfo->mq_handle)
     601          10 :         shm_mq_detach(winfo->mq_handle);
     602             : 
     603          10 :     if (winfo->error_mq_handle)
     604           0 :         shm_mq_detach(winfo->error_mq_handle);
     605             : 
     606             :     /* Unlink the files with serialized changes. */
     607          10 :     if (winfo->serialize_changes)
     608           8 :         stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid);
     609             : 
     610          10 :     if (winfo->dsm_seg)
     611          10 :         dsm_detach(winfo->dsm_seg);
     612             : 
     613             :     /* Remove from the worker pool. */
     614          10 :     ParallelApplyWorkerPool = list_delete_ptr(ParallelApplyWorkerPool, winfo);
     615             : 
     616          10 :     pfree(winfo);
     617          10 : }
     618             : 
     619             : /*
     620             :  * Detach the error queue for all parallel apply workers.
     621             :  */
     622             : void
     623         614 : pa_detach_all_error_mq(void)
     624             : {
     625             :     ListCell   *lc;
     626             : 
     627         624 :     foreach(lc, ParallelApplyWorkerPool)
     628             :     {
     629          10 :         ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
     630             : 
     631          10 :         if (winfo->error_mq_handle)
     632             :         {
     633          10 :             shm_mq_detach(winfo->error_mq_handle);
     634          10 :             winfo->error_mq_handle = NULL;
     635             :         }
     636             :     }
     637         614 : }
     638             : 
     639             : /*
     640             :  * Check if there are any pending spooled messages.
     641             :  */
     642             : static bool
     643          32 : pa_has_spooled_message_pending()
     644             : {
     645             :     PartialFileSetState fileset_state;
     646             : 
     647          32 :     fileset_state = pa_get_fileset_state();
     648             : 
     649          32 :     return (fileset_state != FS_EMPTY);
     650             : }
     651             : 
     652             : /*
     653             :  * Replay the spooled messages once the leader apply worker has finished
     654             :  * serializing changes to the file.
     655             :  *
     656             :  * Returns false if there aren't any pending spooled messages, true otherwise.
     657             :  */
     658             : static bool
     659         108 : pa_process_spooled_messages_if_required(void)
     660             : {
     661             :     PartialFileSetState fileset_state;
     662             : 
     663         108 :     fileset_state = pa_get_fileset_state();
     664             : 
     665         108 :     if (fileset_state == FS_EMPTY)
     666          92 :         return false;
     667             : 
     668             :     /*
     669             :      * If the leader apply worker is busy serializing the partial changes then
     670             :      * acquire the stream lock now and wait for the leader worker to finish
     671             :      * serializing the changes. Otherwise, the parallel apply worker won't get
     672             :      * a chance to receive a STREAM_STOP (and acquire the stream lock) until
     673             :      * the leader had serialized all changes which can lead to undetected
     674             :      * deadlock.
     675             :      *
     676             :      * Note that the fileset state can be FS_SERIALIZE_DONE once the leader
     677             :      * worker has finished serializing the changes.
     678             :      */
     679          16 :     if (fileset_state == FS_SERIALIZE_IN_PROGRESS)
     680             :     {
     681           0 :         pa_lock_stream(MyParallelShared->xid, AccessShareLock);
     682           0 :         pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
     683             : 
     684           0 :         fileset_state = pa_get_fileset_state();
     685             :     }
     686             : 
     687             :     /*
     688             :      * We cannot read the file immediately after the leader has serialized all
     689             :      * changes to the file because there may still be messages in the memory
     690             :      * queue. We will apply all spooled messages the next time we call this
     691             :      * function and that will ensure there are no messages left in the memory
     692             :      * queue.
     693             :      */
     694          16 :     if (fileset_state == FS_SERIALIZE_DONE)
     695             :     {
     696           8 :         pa_set_fileset_state(MyParallelShared, FS_READY);
     697             :     }
     698           8 :     else if (fileset_state == FS_READY)
     699             :     {
     700           8 :         apply_spooled_messages(&MyParallelShared->fileset,
     701           8 :                                MyParallelShared->xid,
     702             :                                InvalidXLogRecPtr);
     703           8 :         pa_set_fileset_state(MyParallelShared, FS_EMPTY);
     704             :     }
     705             : 
     706          16 :     return true;
     707             : }
     708             : 
     709             : /*
     710             :  * Interrupt handler for main loop of parallel apply worker.
     711             :  */
     712             : static void
     713      127864 : ProcessParallelApplyInterrupts(void)
     714             : {
     715      127864 :     CHECK_FOR_INTERRUPTS();
     716             : 
     717      127860 :     if (ShutdownRequestPending)
     718             :     {
     719          10 :         ereport(LOG,
     720             :                 (errmsg("logical replication parallel apply worker for subscription \"%s\" has finished",
     721             :                         MySubscription->name)));
     722             : 
     723          10 :         proc_exit(0);
     724             :     }
     725             : 
     726      127850 :     if (ConfigReloadPending)
     727             :     {
     728           8 :         ConfigReloadPending = false;
     729           8 :         ProcessConfigFile(PGC_SIGHUP);
     730             :     }
     731      127850 : }
     732             : 
     733             : /* Parallel apply worker main loop. */
     734             : static void
     735          20 : LogicalParallelApplyLoop(shm_mq_handle *mqh)
     736             : {
     737             :     shm_mq_result shmq_res;
     738             :     ErrorContextCallback errcallback;
     739          20 :     MemoryContext oldcxt = CurrentMemoryContext;
     740             : 
     741             :     /*
     742             :      * Init the ApplyMessageContext which we clean up after each replication
     743             :      * protocol message.
     744             :      */
     745          20 :     ApplyMessageContext = AllocSetContextCreate(ApplyContext,
     746             :                                                 "ApplyMessageContext",
     747             :                                                 ALLOCSET_DEFAULT_SIZES);
     748             : 
     749             :     /*
     750             :      * Push apply error context callback. Fields will be filled while applying
     751             :      * a change.
     752             :      */
     753          20 :     errcallback.callback = apply_error_callback;
     754          20 :     errcallback.previous = error_context_stack;
     755          20 :     error_context_stack = &errcallback;
     756             : 
     757             :     for (;;)
     758      127844 :     {
     759             :         void       *data;
     760             :         Size        len;
     761             : 
     762      127864 :         ProcessParallelApplyInterrupts();
     763             : 
     764             :         /* Ensure we are reading the data into our memory context. */
     765      127850 :         MemoryContextSwitchTo(ApplyMessageContext);
     766             : 
     767      127850 :         shmq_res = shm_mq_receive(mqh, &len, &data, true);
     768             : 
     769      127850 :         if (shmq_res == SHM_MQ_SUCCESS)
     770             :         {
     771             :             StringInfoData s;
     772             :             int         c;
     773             : 
     774      127742 :             if (len == 0)
     775           0 :                 elog(ERROR, "invalid message length");
     776             : 
     777      127742 :             initReadOnlyStringInfo(&s, data, len);
     778             : 
     779             :             /*
     780             :              * The first byte of messages sent from leader apply worker to
     781             :              * parallel apply workers can only be PqReplMsg_WALData.
     782             :              */
     783      127742 :             c = pq_getmsgbyte(&s);
     784      127742 :             if (c != PqReplMsg_WALData)
     785           0 :                 elog(ERROR, "unexpected message \"%c\"", c);
     786             : 
     787             :             /*
     788             :              * Ignore statistics fields that have been updated by the leader
     789             :              * apply worker.
     790             :              *
     791             :              * XXX We can avoid sending the statistics fields from the leader
     792             :              * apply worker but for that, it needs to rebuild the entire
     793             :              * message by removing these fields which could be more work than
     794             :              * simply ignoring these fields in the parallel apply worker.
     795             :              */
     796      127742 :             s.cursor += SIZE_STATS_MESSAGE;
     797             : 
     798      127742 :             apply_dispatch(&s);
     799             :         }
     800         108 :         else if (shmq_res == SHM_MQ_WOULD_BLOCK)
     801             :         {
     802             :             /* Replay the changes from the file, if any. */
     803         108 :             if (!pa_process_spooled_messages_if_required())
     804             :             {
     805             :                 int         rc;
     806             : 
     807             :                 /* Wait for more work. */
     808          92 :                 rc = WaitLatch(MyLatch,
     809             :                                WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     810             :                                1000L,
     811             :                                WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN);
     812             : 
     813          92 :                 if (rc & WL_LATCH_SET)
     814          86 :                     ResetLatch(MyLatch);
     815             :             }
     816             :         }
     817             :         else
     818             :         {
     819             :             Assert(shmq_res == SHM_MQ_DETACHED);
     820             : 
     821           0 :             ereport(ERROR,
     822             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     823             :                      errmsg("lost connection to the logical replication apply worker")));
     824             :         }
     825             : 
     826      127844 :         MemoryContextReset(ApplyMessageContext);
     827      127844 :         MemoryContextSwitchTo(oldcxt);
     828             :     }
     829             : 
     830             :     /* Pop the error context stack. */
     831             :     error_context_stack = errcallback.previous;
     832             : 
     833             :     MemoryContextSwitchTo(oldcxt);
     834             : }
     835             : 
     836             : /*
     837             :  * Make sure the leader apply worker tries to read from our error queue one more
     838             :  * time. This guards against the case where we exit uncleanly without sending
     839             :  * an ErrorResponse, for example because some code calls proc_exit directly.
     840             :  *
     841             :  * Also explicitly detach from dsm segment to invoke on_dsm_detach callbacks,
     842             :  * if any. See ParallelWorkerShutdown for details.
     843             :  */
     844             : static void
     845          20 : pa_shutdown(int code, Datum arg)
     846             : {
     847          20 :     SendProcSignal(MyLogicalRepWorker->leader_pid,
     848             :                    PROCSIG_PARALLEL_APPLY_MESSAGE,
     849             :                    INVALID_PROC_NUMBER);
     850             : 
     851          20 :     dsm_detach((dsm_segment *) DatumGetPointer(arg));
     852          20 : }
     853             : 
     854             : /*
     855             :  * Parallel apply worker entry point.
     856             :  */
     857             : void
     858          20 : ParallelApplyWorkerMain(Datum main_arg)
     859             : {
     860             :     ParallelApplyWorkerShared *shared;
     861             :     dsm_handle  handle;
     862             :     dsm_segment *seg;
     863             :     shm_toc    *toc;
     864             :     shm_mq     *mq;
     865             :     shm_mq_handle *mqh;
     866             :     shm_mq_handle *error_mqh;
     867             :     RepOriginId originid;
     868          20 :     int         worker_slot = DatumGetInt32(main_arg);
     869             :     char        originname[NAMEDATALEN];
     870             : 
     871          20 :     InitializingApplyWorker = true;
     872             : 
     873             :     /*
     874             :      * Setup signal handling.
     875             :      *
     876             :      * Note: We intentionally used SIGUSR2 to trigger a graceful shutdown
     877             :      * initiated by the leader apply worker. This helps to differentiate it
     878             :      * from the case where we abort the current transaction and exit on
     879             :      * receiving SIGTERM.
     880             :      */
     881          20 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
     882          20 :     pqsignal(SIGTERM, die);
     883          20 :     pqsignal(SIGUSR2, SignalHandlerForShutdownRequest);
     884          20 :     BackgroundWorkerUnblockSignals();
     885             : 
     886             :     /*
     887             :      * Attach to the dynamic shared memory segment for the parallel apply, and
     888             :      * find its table of contents.
     889             :      *
     890             :      * Like parallel query, we don't need resource owner by this time. See
     891             :      * ParallelWorkerMain.
     892             :      */
     893          20 :     memcpy(&handle, MyBgworkerEntry->bgw_extra, sizeof(dsm_handle));
     894          20 :     seg = dsm_attach(handle);
     895          20 :     if (!seg)
     896           0 :         ereport(ERROR,
     897             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     898             :                  errmsg("could not map dynamic shared memory segment")));
     899             : 
     900          20 :     toc = shm_toc_attach(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg));
     901          20 :     if (!toc)
     902           0 :         ereport(ERROR,
     903             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     904             :                  errmsg("invalid magic number in dynamic shared memory segment")));
     905             : 
     906             :     /* Look up the shared information. */
     907          20 :     shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false);
     908          20 :     MyParallelShared = shared;
     909             : 
     910             :     /*
     911             :      * Attach to the message queue.
     912             :      */
     913          20 :     mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_MQ, false);
     914          20 :     shm_mq_set_receiver(mq, MyProc);
     915          20 :     mqh = shm_mq_attach(mq, seg, NULL);
     916             : 
     917             :     /*
     918             :      * Primary initialization is complete. Now, we can attach to our slot.
     919             :      * This is to ensure that the leader apply worker does not write data to
     920             :      * the uninitialized memory queue.
     921             :      */
     922          20 :     logicalrep_worker_attach(worker_slot);
     923             : 
     924             :     /*
     925             :      * Register the shutdown callback after we are attached to the worker
     926             :      * slot. This is to ensure that MyLogicalRepWorker remains valid when this
     927             :      * callback is invoked.
     928             :      */
     929          20 :     before_shmem_exit(pa_shutdown, PointerGetDatum(seg));
     930             : 
     931          20 :     SpinLockAcquire(&MyParallelShared->mutex);
     932          20 :     MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation;
     933          20 :     MyParallelShared->logicalrep_worker_slot_no = worker_slot;
     934          20 :     SpinLockRelease(&MyParallelShared->mutex);
     935             : 
     936             :     /*
     937             :      * Attach to the error queue.
     938             :      */
     939          20 :     mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, false);
     940          20 :     shm_mq_set_sender(mq, MyProc);
     941          20 :     error_mqh = shm_mq_attach(mq, seg, NULL);
     942             : 
     943          20 :     pq_redirect_to_shm_mq(seg, error_mqh);
     944          20 :     pq_set_parallel_leader(MyLogicalRepWorker->leader_pid,
     945             :                            INVALID_PROC_NUMBER);
     946             : 
     947          20 :     MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
     948          20 :         MyLogicalRepWorker->reply_time = 0;
     949             : 
     950          20 :     InitializeLogRepWorker();
     951             : 
     952          20 :     InitializingApplyWorker = false;
     953             : 
     954             :     /* Setup replication origin tracking. */
     955          20 :     StartTransactionCommand();
     956          20 :     ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
     957             :                                        originname, sizeof(originname));
     958          20 :     originid = replorigin_by_name(originname, false);
     959             : 
     960             :     /*
     961             :      * The parallel apply worker doesn't need to monopolize this replication
     962             :      * origin which was already acquired by its leader process.
     963             :      */
     964          20 :     replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
     965          20 :     replorigin_session_origin = originid;
     966          20 :     CommitTransactionCommand();
     967             : 
     968             :     /*
     969             :      * Setup callback for syscache so that we know when something changes in
     970             :      * the subscription relation state.
     971             :      */
     972          20 :     CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
     973             :                                   invalidate_syncing_table_states,
     974             :                                   (Datum) 0);
     975             : 
     976          20 :     set_apply_error_context_origin(originname);
     977             : 
     978          20 :     LogicalParallelApplyLoop(mqh);
     979             : 
     980             :     /*
     981             :      * The parallel apply worker must not get here because the parallel apply
     982             :      * worker will only stop when it receives a SIGTERM or SIGUSR2 from the
     983             :      * leader, or SIGINT from itself, or when there is an error. None of these
     984             :      * cases will allow the code to reach here.
     985             :      */
     986             :     Assert(false);
     987           0 : }
     988             : 
     989             : /*
     990             :  * Handle receipt of an interrupt indicating a parallel apply worker message.
     991             :  *
     992             :  * Note: this is called within a signal handler! All we can do is set a flag
     993             :  * that will cause the next CHECK_FOR_INTERRUPTS() to invoke
     994             :  * ProcessParallelApplyMessages().
     995             :  */
     996             : void
     997          22 : HandleParallelApplyMessageInterrupt(void)
     998             : {
     999          22 :     InterruptPending = true;
    1000          22 :     ParallelApplyMessagePending = true;
    1001          22 :     SetLatch(MyLatch);
    1002          22 : }
    1003             : 
    1004             : /*
    1005             :  * Process a single protocol message received from a single parallel apply
    1006             :  * worker.
    1007             :  */
    1008             : static void
    1009           2 : ProcessParallelApplyMessage(StringInfo msg)
    1010             : {
    1011             :     char        msgtype;
    1012             : 
    1013           2 :     msgtype = pq_getmsgbyte(msg);
    1014             : 
    1015           2 :     switch (msgtype)
    1016             :     {
    1017           2 :         case PqMsg_ErrorResponse:
    1018             :             {
    1019             :                 ErrorData   edata;
    1020             : 
    1021             :                 /* Parse ErrorResponse. */
    1022           2 :                 pq_parse_errornotice(msg, &edata);
    1023             : 
    1024             :                 /*
    1025             :                  * If desired, add a context line to show that this is a
    1026             :                  * message propagated from a parallel apply worker. Otherwise,
    1027             :                  * it can sometimes be confusing to understand what actually
    1028             :                  * happened.
    1029             :                  */
    1030           2 :                 if (edata.context)
    1031           2 :                     edata.context = psprintf("%s\n%s", edata.context,
    1032             :                                              _("logical replication parallel apply worker"));
    1033             :                 else
    1034           0 :                     edata.context = pstrdup(_("logical replication parallel apply worker"));
    1035             : 
    1036             :                 /*
    1037             :                  * Context beyond that should use the error context callbacks
    1038             :                  * that were in effect in LogicalRepApplyLoop().
    1039             :                  */
    1040           2 :                 error_context_stack = apply_error_context_stack;
    1041             : 
    1042             :                 /*
    1043             :                  * The actual error must have been reported by the parallel
    1044             :                  * apply worker.
    1045             :                  */
    1046           2 :                 ereport(ERROR,
    1047             :                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1048             :                          errmsg("logical replication parallel apply worker exited due to error"),
    1049             :                          errcontext("%s", edata.context)));
    1050             :             }
    1051             : 
    1052             :             /*
    1053             :              * Don't need to do anything about NoticeResponse and
    1054             :              * NotificationResponse as the logical replication worker doesn't
    1055             :              * need to send messages to the client.
    1056             :              */
    1057           0 :         case PqMsg_NoticeResponse:
    1058             :         case PqMsg_NotificationResponse:
    1059           0 :             break;
    1060             : 
    1061           0 :         default:
    1062           0 :             elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",
    1063             :                  msgtype, msg->len);
    1064             :     }
    1065           0 : }
    1066             : 
    1067             : /*
    1068             :  * Handle any queued protocol messages received from parallel apply workers.
    1069             :  */
    1070             : void
    1071          12 : ProcessParallelApplyMessages(void)
    1072             : {
    1073             :     ListCell   *lc;
    1074             :     MemoryContext oldcontext;
    1075             : 
    1076             :     static MemoryContext hpam_context = NULL;
    1077             : 
    1078             :     /*
    1079             :      * This is invoked from ProcessInterrupts(), and since some of the
    1080             :      * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
    1081             :      * for recursive calls if more signals are received while this runs. It's
    1082             :      * unclear that recursive entry would be safe, and it doesn't seem useful
    1083             :      * even if it is safe, so let's block interrupts until done.
    1084             :      */
    1085          12 :     HOLD_INTERRUPTS();
    1086             : 
    1087             :     /*
    1088             :      * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
    1089             :      * don't want to risk leaking data into long-lived contexts, so let's do
    1090             :      * our work here in a private context that we can reset on each use.
    1091             :      */
    1092          12 :     if (!hpam_context)          /* first time through? */
    1093          10 :         hpam_context = AllocSetContextCreate(TopMemoryContext,
    1094             :                                              "ProcessParallelApplyMessages",
    1095             :                                              ALLOCSET_DEFAULT_SIZES);
    1096             :     else
    1097           2 :         MemoryContextReset(hpam_context);
    1098             : 
    1099          12 :     oldcontext = MemoryContextSwitchTo(hpam_context);
    1100             : 
    1101          12 :     ParallelApplyMessagePending = false;
    1102             : 
    1103          24 :     foreach(lc, ParallelApplyWorkerPool)
    1104             :     {
    1105             :         shm_mq_result res;
    1106             :         Size        nbytes;
    1107             :         void       *data;
    1108          14 :         ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
    1109             : 
    1110             :         /*
    1111             :          * The leader will detach from the error queue and set it to NULL
    1112             :          * before preparing to stop all parallel apply workers, so we don't
    1113             :          * need to handle error messages anymore. See
    1114             :          * logicalrep_worker_detach.
    1115             :          */
    1116          14 :         if (!winfo->error_mq_handle)
    1117          12 :             continue;
    1118             : 
    1119           4 :         res = shm_mq_receive(winfo->error_mq_handle, &nbytes, &data, true);
    1120             : 
    1121           4 :         if (res == SHM_MQ_WOULD_BLOCK)
    1122           2 :             continue;
    1123           2 :         else if (res == SHM_MQ_SUCCESS)
    1124             :         {
    1125             :             StringInfoData msg;
    1126             : 
    1127           2 :             initStringInfo(&msg);
    1128           2 :             appendBinaryStringInfo(&msg, data, nbytes);
    1129           2 :             ProcessParallelApplyMessage(&msg);
    1130           0 :             pfree(msg.data);
    1131             :         }
    1132             :         else
    1133           0 :             ereport(ERROR,
    1134             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1135             :                      errmsg("lost connection to the logical replication parallel apply worker")));
    1136             :     }
    1137             : 
    1138          10 :     MemoryContextSwitchTo(oldcontext);
    1139             : 
    1140             :     /* Might as well clear the context on our way out */
    1141          10 :     MemoryContextReset(hpam_context);
    1142             : 
    1143          10 :     RESUME_INTERRUPTS();
    1144          10 : }
    1145             : 
    1146             : /*
    1147             :  * Send the data to the specified parallel apply worker via shared-memory
    1148             :  * queue.
    1149             :  *
    1150             :  * Returns false if the attempt to send data via shared memory times out, true
    1151             :  * otherwise.
    1152             :  */
    1153             : bool
    1154      137792 : pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
    1155             : {
    1156             :     int         rc;
    1157             :     shm_mq_result result;
    1158      137792 :     TimestampTz startTime = 0;
    1159             : 
    1160             :     Assert(!IsTransactionState());
    1161             :     Assert(!winfo->serialize_changes);
    1162             : 
    1163             :     /*
    1164             :      * We don't try to send data to parallel worker for 'immediate' mode. This
    1165             :      * is primarily used for testing purposes.
    1166             :      */
    1167      137792 :     if (unlikely(debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE))
    1168           8 :         return false;
    1169             : 
    1170             : /*
    1171             :  * This timeout is a bit arbitrary but testing revealed that it is sufficient
    1172             :  * to send the message unless the parallel apply worker is waiting on some
    1173             :  * lock or there is a serious resource crunch. See the comments atop this file
    1174             :  * to know why we are using a non-blocking way to send the message.
    1175             :  */
    1176             : #define SHM_SEND_RETRY_INTERVAL_MS 1000
    1177             : #define SHM_SEND_TIMEOUT_MS     (10000 - SHM_SEND_RETRY_INTERVAL_MS)
    1178             : 
    1179             :     for (;;)
    1180             :     {
    1181      137784 :         result = shm_mq_send(winfo->mq_handle, nbytes, data, true, true);
    1182             : 
    1183      137784 :         if (result == SHM_MQ_SUCCESS)
    1184      137784 :             return true;
    1185           0 :         else if (result == SHM_MQ_DETACHED)
    1186           0 :             ereport(ERROR,
    1187             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1188             :                      errmsg("could not send data to shared-memory queue")));
    1189             : 
    1190             :         Assert(result == SHM_MQ_WOULD_BLOCK);
    1191             : 
    1192             :         /* Wait before retrying. */
    1193           0 :         rc = WaitLatch(MyLatch,
    1194             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1195             :                        SHM_SEND_RETRY_INTERVAL_MS,
    1196             :                        WAIT_EVENT_LOGICAL_APPLY_SEND_DATA);
    1197             : 
    1198           0 :         if (rc & WL_LATCH_SET)
    1199             :         {
    1200           0 :             ResetLatch(MyLatch);
    1201           0 :             CHECK_FOR_INTERRUPTS();
    1202             :         }
    1203             : 
    1204           0 :         if (startTime == 0)
    1205           0 :             startTime = GetCurrentTimestamp();
    1206           0 :         else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
    1207             :                                             SHM_SEND_TIMEOUT_MS))
    1208           0 :             return false;
    1209             :     }
    1210             : }
    1211             : 
    1212             : /*
    1213             :  * Switch to PARTIAL_SERIALIZE mode for the current transaction -- this means
    1214             :  * that the current data and any subsequent data for this transaction will be
    1215             :  * serialized to a file. This is done to prevent possible deadlocks with
    1216             :  * another parallel apply worker (refer to the comments atop this file).
    1217             :  */
    1218             : void
    1219           8 : pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
    1220             :                                bool stream_locked)
    1221             : {
    1222           8 :     ereport(LOG,
    1223             :             (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
    1224             :                     winfo->shared->xid)));
    1225             : 
    1226             :     /*
    1227             :      * The parallel apply worker could be stuck for some reason (say waiting
    1228             :      * on some lock by other backend), so stop trying to send data directly to
    1229             :      * it and start serializing data to the file instead.
    1230             :      */
    1231           8 :     winfo->serialize_changes = true;
    1232             : 
    1233             :     /* Initialize the stream fileset. */
    1234           8 :     stream_start_internal(winfo->shared->xid, true);
    1235             : 
    1236             :     /*
    1237             :      * Acquires the stream lock if not already to make sure that the parallel
    1238             :      * apply worker will wait for the leader to release the stream lock until
    1239             :      * the end of the transaction.
    1240             :      */
    1241           8 :     if (!stream_locked)
    1242           8 :         pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
    1243             : 
    1244           8 :     pa_set_fileset_state(winfo->shared, FS_SERIALIZE_IN_PROGRESS);
    1245           8 : }
    1246             : 
    1247             : /*
    1248             :  * Wait until the parallel apply worker's transaction state has reached or
    1249             :  * exceeded the given xact_state.
    1250             :  */
    1251             : static void
    1252          50 : pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo,
    1253             :                        ParallelTransState xact_state)
    1254             : {
    1255             :     for (;;)
    1256             :     {
    1257             :         /*
    1258             :          * Stop if the transaction state has reached or exceeded the given
    1259             :          * xact_state.
    1260             :          */
    1261         584 :         if (pa_get_xact_state(winfo->shared) >= xact_state)
    1262          50 :             break;
    1263             : 
    1264             :         /* Wait to be signalled. */
    1265         534 :         (void) WaitLatch(MyLatch,
    1266             :                          WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1267             :                          10L,
    1268             :                          WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);
    1269             : 
    1270             :         /* Reset the latch so we don't spin. */
    1271         534 :         ResetLatch(MyLatch);
    1272             : 
    1273             :         /* An interrupt may have occurred while we were waiting. */
    1274         534 :         CHECK_FOR_INTERRUPTS();
    1275             :     }
    1276          50 : }
    1277             : 
    1278             : /*
    1279             :  * Wait until the parallel apply worker's transaction finishes.
    1280             :  */
    1281             : static void
    1282          50 : pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
    1283             : {
    1284             :     /*
    1285             :      * Wait until the parallel apply worker set the state to
    1286             :      * PARALLEL_TRANS_STARTED which means it has acquired the transaction
    1287             :      * lock. This is to prevent leader apply worker from acquiring the
    1288             :      * transaction lock earlier than the parallel apply worker.
    1289             :      */
    1290          50 :     pa_wait_for_xact_state(winfo, PARALLEL_TRANS_STARTED);
    1291             : 
    1292             :     /*
    1293             :      * Wait for the transaction lock to be released. This is required to
    1294             :      * detect deadlock among leader and parallel apply workers. Refer to the
    1295             :      * comments atop this file.
    1296             :      */
    1297          50 :     pa_lock_transaction(winfo->shared->xid, AccessShareLock);
    1298          48 :     pa_unlock_transaction(winfo->shared->xid, AccessShareLock);
    1299             : 
    1300             :     /*
    1301             :      * Check if the state becomes PARALLEL_TRANS_FINISHED in case the parallel
    1302             :      * apply worker failed while applying changes causing the lock to be
    1303             :      * released.
    1304             :      */
    1305          48 :     if (pa_get_xact_state(winfo->shared) != PARALLEL_TRANS_FINISHED)
    1306           0 :         ereport(ERROR,
    1307             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1308             :                  errmsg("lost connection to the logical replication parallel apply worker")));
    1309          48 : }
    1310             : 
    1311             : /*
    1312             :  * Set the transaction state for a given parallel apply worker.
    1313             :  */
    1314             : void
    1315         102 : pa_set_xact_state(ParallelApplyWorkerShared *wshared,
    1316             :                   ParallelTransState xact_state)
    1317             : {
    1318         102 :     SpinLockAcquire(&wshared->mutex);
    1319         102 :     wshared->xact_state = xact_state;
    1320         102 :     SpinLockRelease(&wshared->mutex);
    1321         102 : }
    1322             : 
    1323             : /*
    1324             :  * Get the transaction state for a given parallel apply worker.
    1325             :  */
    1326             : static ParallelTransState
    1327         632 : pa_get_xact_state(ParallelApplyWorkerShared *wshared)
    1328             : {
    1329             :     ParallelTransState xact_state;
    1330             : 
    1331         632 :     SpinLockAcquire(&wshared->mutex);
    1332         632 :     xact_state = wshared->xact_state;
    1333         632 :     SpinLockRelease(&wshared->mutex);
    1334             : 
    1335         632 :     return xact_state;
    1336             : }
    1337             : 
    1338             : /*
    1339             :  * Cache the parallel apply worker information.
    1340             :  */
    1341             : void
    1342        1012 : pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
    1343             : {
    1344        1012 :     stream_apply_worker = winfo;
    1345        1012 : }
    1346             : 
    1347             : /*
    1348             :  * Form a unique savepoint name for the streaming transaction.
    1349             :  *
    1350             :  * Note that different subscriptions for publications on different nodes can
    1351             :  * receive same remote xid, so we need to use subscription id along with it.
    1352             :  *
    1353             :  * Returns the name in the supplied buffer.
    1354             :  */
    1355             : static void
    1356          54 : pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
    1357             : {
    1358          54 :     snprintf(spname, szsp, "pg_sp_%u_%u", suboid, xid);
    1359          54 : }
    1360             : 
    1361             : /*
    1362             :  * Define a savepoint for a subxact in parallel apply worker if needed.
    1363             :  *
    1364             :  * The parallel apply worker can figure out if a new subtransaction was
    1365             :  * started by checking if the new change arrived with a different xid. In that
    1366             :  * case define a named savepoint, so that we are able to rollback to it
    1367             :  * if required.
    1368             :  */
    1369             : void
    1370      136786 : pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
    1371             : {
    1372      136786 :     if (current_xid != top_xid &&
    1373         104 :         !list_member_xid(subxactlist, current_xid))
    1374             :     {
    1375             :         MemoryContext oldctx;
    1376             :         char        spname[NAMEDATALEN];
    1377             : 
    1378          34 :         pa_savepoint_name(MySubscription->oid, current_xid,
    1379             :                           spname, sizeof(spname));
    1380             : 
    1381          34 :         elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);
    1382             : 
    1383             :         /* We must be in transaction block to define the SAVEPOINT. */
    1384          34 :         if (!IsTransactionBlock())
    1385             :         {
    1386          10 :             if (!IsTransactionState())
    1387           0 :                 StartTransactionCommand();
    1388             : 
    1389          10 :             BeginTransactionBlock();
    1390          10 :             CommitTransactionCommand();
    1391             :         }
    1392             : 
    1393          34 :         DefineSavepoint(spname);
    1394             : 
    1395             :         /*
    1396             :          * CommitTransactionCommand is needed to start a subtransaction after
    1397             :          * issuing a SAVEPOINT inside a transaction block (see
    1398             :          * StartSubTransaction()).
    1399             :          */
    1400          34 :         CommitTransactionCommand();
    1401             : 
    1402          34 :         oldctx = MemoryContextSwitchTo(TopTransactionContext);
    1403          34 :         subxactlist = lappend_xid(subxactlist, current_xid);
    1404          34 :         MemoryContextSwitchTo(oldctx);
    1405             :     }
    1406      136786 : }
    1407             : 
    1408             : /* Reset the list that maintains subtransactions. */
    1409             : void
    1410          48 : pa_reset_subtrans(void)
    1411             : {
    1412             :     /*
    1413             :      * We don't need to free this explicitly as the allocated memory will be
    1414             :      * freed at the transaction end.
    1415             :      */
    1416          48 :     subxactlist = NIL;
    1417          48 : }
    1418             : 
    1419             : /*
    1420             :  * Handle STREAM ABORT message when the transaction was applied in a parallel
    1421             :  * apply worker.
    1422             :  */
    1423             : void
    1424          24 : pa_stream_abort(LogicalRepStreamAbortData *abort_data)
    1425             : {
    1426          24 :     TransactionId xid = abort_data->xid;
    1427          24 :     TransactionId subxid = abort_data->subxid;
    1428             : 
    1429             :     /*
    1430             :      * Update origin state so we can restart streaming from correct position
    1431             :      * in case of crash.
    1432             :      */
    1433          24 :     replorigin_session_origin_lsn = abort_data->abort_lsn;
    1434          24 :     replorigin_session_origin_timestamp = abort_data->abort_time;
    1435             : 
    1436             :     /*
    1437             :      * If the two XIDs are the same, it's in fact abort of toplevel xact, so
    1438             :      * just free the subxactlist.
    1439             :      */
    1440          24 :     if (subxid == xid)
    1441             :     {
    1442           4 :         pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
    1443             : 
    1444             :         /*
    1445             :          * Release the lock as we might be processing an empty streaming
    1446             :          * transaction in which case the lock won't be released during
    1447             :          * transaction rollback.
    1448             :          *
    1449             :          * Note that it's ok to release the transaction lock before aborting
    1450             :          * the transaction because even if the parallel apply worker dies due
    1451             :          * to crash or some other reason, such a transaction would still be
    1452             :          * considered aborted.
    1453             :          */
    1454           4 :         pa_unlock_transaction(xid, AccessExclusiveLock);
    1455             : 
    1456           4 :         AbortCurrentTransaction();
    1457             : 
    1458           4 :         if (IsTransactionBlock())
    1459             :         {
    1460           2 :             EndTransactionBlock(false);
    1461           2 :             CommitTransactionCommand();
    1462             :         }
    1463             : 
    1464           4 :         pa_reset_subtrans();
    1465             : 
    1466           4 :         pgstat_report_activity(STATE_IDLE, NULL);
    1467             :     }
    1468             :     else
    1469             :     {
    1470             :         /* OK, so it's a subxact. Rollback to the savepoint. */
    1471             :         int         i;
    1472             :         char        spname[NAMEDATALEN];
    1473             : 
    1474          20 :         pa_savepoint_name(MySubscription->oid, subxid, spname, sizeof(spname));
    1475             : 
    1476          20 :         elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);
    1477             : 
    1478             :         /*
    1479             :          * Search the subxactlist, determine the offset tracked for the
    1480             :          * subxact, and truncate the list.
    1481             :          *
    1482             :          * Note that for an empty sub-transaction we won't find the subxid
    1483             :          * here.
    1484             :          */
    1485          24 :         for (i = list_length(subxactlist) - 1; i >= 0; i--)
    1486             :         {
    1487          22 :             TransactionId xid_tmp = lfirst_xid(list_nth_cell(subxactlist, i));
    1488             : 
    1489          22 :             if (xid_tmp == subxid)
    1490             :             {
    1491          18 :                 RollbackToSavepoint(spname);
    1492          18 :                 CommitTransactionCommand();
    1493          18 :                 subxactlist = list_truncate(subxactlist, i);
    1494          18 :                 break;
    1495             :             }
    1496             :         }
    1497             :     }
    1498          24 : }
    1499             : 
    1500             : /*
    1501             :  * Set the fileset state for a particular parallel apply worker. The fileset
    1502             :  * will be set once the leader worker serialized all changes to the file
    1503             :  * so that it can be used by parallel apply worker.
    1504             :  */
    1505             : void
    1506          32 : pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
    1507             :                      PartialFileSetState fileset_state)
    1508             : {
    1509          32 :     SpinLockAcquire(&wshared->mutex);
    1510          32 :     wshared->fileset_state = fileset_state;
    1511             : 
    1512          32 :     if (fileset_state == FS_SERIALIZE_DONE)
    1513             :     {
    1514             :         Assert(am_leader_apply_worker());
    1515             :         Assert(MyLogicalRepWorker->stream_fileset);
    1516           8 :         wshared->fileset = *MyLogicalRepWorker->stream_fileset;
    1517             :     }
    1518             : 
    1519          32 :     SpinLockRelease(&wshared->mutex);
    1520          32 : }
    1521             : 
    1522             : /*
    1523             :  * Get the fileset state for the current parallel apply worker.
    1524             :  */
    1525             : static PartialFileSetState
    1526         140 : pa_get_fileset_state(void)
    1527             : {
    1528             :     PartialFileSetState fileset_state;
    1529             : 
    1530             :     Assert(am_parallel_apply_worker());
    1531             : 
    1532         140 :     SpinLockAcquire(&MyParallelShared->mutex);
    1533         140 :     fileset_state = MyParallelShared->fileset_state;
    1534         140 :     SpinLockRelease(&MyParallelShared->mutex);
    1535             : 
    1536         140 :     return fileset_state;
    1537             : }
    1538             : 
    1539             : /*
    1540             :  * Helper functions to acquire and release a lock for each stream block.
    1541             :  *
    1542             :  * Set locktag_field4 to PARALLEL_APPLY_LOCK_STREAM to indicate that it's a
    1543             :  * stream lock.
    1544             :  *
    1545             :  * Refer to the comments atop this file to see how the stream lock is used.
    1546             :  */
    1547             : void
    1548         554 : pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
    1549             : {
    1550         554 :     LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
    1551             :                                    PARALLEL_APPLY_LOCK_STREAM, lockmode);
    1552         550 : }
    1553             : 
    1554             : void
    1555         546 : pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
    1556             : {
    1557         546 :     UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
    1558             :                                      PARALLEL_APPLY_LOCK_STREAM, lockmode);
    1559         546 : }
    1560             : 
    1561             : /*
    1562             :  * Helper functions to acquire and release a lock for each local transaction
    1563             :  * apply.
    1564             :  *
    1565             :  * Set locktag_field4 to PARALLEL_APPLY_LOCK_XACT to indicate that it's a
    1566             :  * transaction lock.
    1567             :  *
    1568             :  * Note that all the callers must pass a remote transaction ID instead of a
    1569             :  * local transaction ID as xid. This is because the local transaction ID will
    1570             :  * only be assigned while applying the first change in the parallel apply but
    1571             :  * it's possible that the first change in the parallel apply worker is blocked
    1572             :  * by a concurrently executing transaction in another parallel apply worker. We
    1573             :  * can only communicate the local transaction id to the leader after applying
    1574             :  * the first change so it won't be able to wait after sending the xact finish
    1575             :  * command using this lock.
    1576             :  *
    1577             :  * Refer to the comments atop this file to see how the transaction lock is
    1578             :  * used.
    1579             :  */
    1580             : void
    1581         104 : pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
    1582             : {
    1583         104 :     LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
    1584             :                                    PARALLEL_APPLY_LOCK_XACT, lockmode);
    1585         102 : }
    1586             : 
    1587             : void
    1588          96 : pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
    1589             : {
    1590          96 :     UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
    1591             :                                      PARALLEL_APPLY_LOCK_XACT, lockmode);
    1592          96 : }
    1593             : 
    1594             : /*
    1595             :  * Decrement the number of pending streaming blocks and wait on the stream lock
    1596             :  * if there is no pending block available.
    1597             :  */
    1598             : void
    1599         504 : pa_decr_and_wait_stream_block(void)
    1600             : {
    1601             :     Assert(am_parallel_apply_worker());
    1602             : 
    1603             :     /*
    1604             :      * It is only possible to not have any pending stream chunks when we are
    1605             :      * applying spooled messages.
    1606             :      */
    1607         504 :     if (pg_atomic_read_u32(&MyParallelShared->pending_stream_count) == 0)
    1608             :     {
    1609          32 :         if (pa_has_spooled_message_pending())
    1610          32 :             return;
    1611             : 
    1612           0 :         elog(ERROR, "invalid pending streaming chunk 0");
    1613             :     }
    1614             : 
    1615         472 :     if (pg_atomic_sub_fetch_u32(&MyParallelShared->pending_stream_count, 1) == 0)
    1616             :     {
    1617          52 :         pa_lock_stream(MyParallelShared->xid, AccessShareLock);
    1618          48 :         pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
    1619             :     }
    1620             : }
    1621             : 
    1622             : /*
    1623             :  * Finish processing the streaming transaction in the leader apply worker.
    1624             :  */
    1625             : void
    1626          50 : pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
    1627             : {
    1628             :     Assert(am_leader_apply_worker());
    1629             : 
    1630             :     /*
    1631             :      * Unlock the shared object lock so that parallel apply worker can
    1632             :      * continue to receive and apply changes.
    1633             :      */
    1634          50 :     pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
    1635             : 
    1636             :     /*
    1637             :      * Wait for that worker to finish. This is necessary to maintain commit
    1638             :      * order which avoids failures due to transaction dependencies and
    1639             :      * deadlocks.
    1640             :      */
    1641          50 :     pa_wait_for_xact_finish(winfo);
    1642             : 
    1643          48 :     if (!XLogRecPtrIsInvalid(remote_lsn))
    1644          44 :         store_flush_position(remote_lsn, winfo->shared->last_commit_end);
    1645             : 
    1646          48 :     pa_free_worker(winfo);
    1647          48 : }

Generated by: LCOV version 1.16