LCOV - code coverage report
Current view: top level - src/backend/replication/logical - applyparallelworker.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 368 411 89.5 %
Date: 2025-07-29 03:18:01 Functions: 36 36 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * applyparallelworker.c
       3             :  *     Support routines for applying xact by parallel apply worker
       4             :  *
       5             :  * Copyright (c) 2023-2025, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/applyparallelworker.c
       9             :  *
      10             :  * This file contains the code to launch, set up, and teardown a parallel apply
      11             :  * worker which receives the changes from the leader worker and invokes routines
      12             :  * to apply those on the subscriber database. Additionally, this file contains
      13             :  * routines that are intended to support setting up, using, and tearing down a
      14             :  * ParallelApplyWorkerInfo which is required so the leader worker and parallel
      15             :  * apply workers can communicate with each other.
      16             :  *
      17             :  * The parallel apply workers are assigned (if available) as soon as xact's
      18             :  * first stream is received for subscriptions that have set their 'streaming'
      19             :  * option as parallel. The leader apply worker will send changes to this new
      20             :  * worker via shared memory. We keep this worker assigned till the transaction
      21             :  * commit is received and also wait for the worker to finish at commit. This
      22             :  * preserves commit ordering and avoid file I/O in most cases, although we
      23             :  * still need to spill to a file if there is no worker available. See comments
      24             :  * atop logical/worker to know more about streamed xacts whose changes are
      25             :  * spilled to disk. It is important to maintain commit order to avoid failures
      26             :  * due to: (a) transaction dependencies - say if we insert a row in the first
      27             :  * transaction and update it in the second transaction on publisher then
      28             :  * allowing the subscriber to apply both in parallel can lead to failure in the
      29             :  * update; (b) deadlocks - allowing transactions that update the same set of
      30             :  * rows/tables in the opposite order to be applied in parallel can lead to
      31             :  * deadlocks.
      32             :  *
      33             :  * A worker pool is used to avoid restarting workers for each streaming
      34             :  * transaction. We maintain each worker's information (ParallelApplyWorkerInfo)
      35             :  * in the ParallelApplyWorkerPool. After successfully launching a new worker,
      36             :  * its information is added to the ParallelApplyWorkerPool. Once the worker
      37             :  * finishes applying the transaction, it is marked as available for re-use.
      38             :  * Now, before starting a new worker to apply the streaming transaction, we
      39             :  * check the list for any available worker. Note that we retain a maximum of
      40             :  * half the max_parallel_apply_workers_per_subscription workers in the pool and
      41             :  * after that, we simply exit the worker after applying the transaction.
      42             :  *
      43             :  * XXX This worker pool threshold is arbitrary and we can provide a GUC
      44             :  * variable for this in the future if required.
      45             :  *
      46             :  * The leader apply worker will create a separate dynamic shared memory segment
      47             :  * when each parallel apply worker starts. The reason for this design is that
      48             :  * we cannot predict how many workers will be needed. It may be possible to
      49             :  * allocate enough shared memory in one segment based on the maximum number of
      50             :  * parallel apply workers (max_parallel_apply_workers_per_subscription), but
      51             :  * this would waste memory if no process is actually started.
      52             :  *
      53             :  * The dynamic shared memory segment contains: (a) a shm_mq that is used to
      54             :  * send changes in the transaction from leader apply worker to parallel apply
      55             :  * worker; (b) another shm_mq that is used to send errors (and other messages
      56             :  * reported via elog/ereport) from the parallel apply worker to leader apply
      57             :  * worker; (c) necessary information to be shared among parallel apply workers
      58             :  * and the leader apply worker (i.e. members of ParallelApplyWorkerShared).
      59             :  *
      60             :  * Locking Considerations
      61             :  * ----------------------
      62             :  * We have a risk of deadlock due to concurrently applying the transactions in
      63             :  * parallel mode that were independent on the publisher side but became
      64             :  * dependent on the subscriber side due to the different database structures
      65             :  * (like schema of subscription tables, constraints, etc.) on each side. This
      66             :  * can happen even without parallel mode when there are concurrent operations
      67             :  * on the subscriber. In order to detect the deadlocks among leader (LA) and
      68             :  * parallel apply (PA) workers, we used lmgr locks when the PA waits for the
      69             :  * next stream (set of changes) and LA waits for PA to finish the transaction.
      70             :  * An alternative approach could be to not allow parallelism when the schema of
      71             :  * tables is different between the publisher and subscriber but that would be
      72             :  * too restrictive and would require the publisher to send much more
      73             :  * information than it is currently sending.
      74             :  *
      75             :  * Consider a case where the subscribed table does not have a unique key on the
      76             :  * publisher and has a unique key on the subscriber. The deadlock can happen in
      77             :  * the following ways:
      78             :  *
      79             :  * 1) Deadlock between the leader apply worker and a parallel apply worker
      80             :  *
      81             :  * Consider that the parallel apply worker (PA) is executing TX-1 and the
      82             :  * leader apply worker (LA) is executing TX-2 concurrently on the subscriber.
      83             :  * Now, LA is waiting for PA because of the unique key constraint of the
      84             :  * subscribed table while PA is waiting for LA to send the next stream of
      85             :  * changes or transaction finish command message.
      86             :  *
      87             :  * In order for lmgr to detect this, we have LA acquire a session lock on the
      88             :  * remote transaction (by pa_lock_stream()) and have PA wait on the lock before
      89             :  * trying to receive the next stream of changes. Specifically, LA will acquire
      90             :  * the lock in AccessExclusive mode before sending the STREAM_STOP and will
      91             :  * release it if already acquired after sending the STREAM_START, STREAM_ABORT
      92             :  * (for toplevel transaction), STREAM_PREPARE, and STREAM_COMMIT. The PA will
      93             :  * acquire the lock in AccessShare mode after processing STREAM_STOP and
      94             :  * STREAM_ABORT (for subtransaction) and then release the lock immediately
      95             :  * after acquiring it.
      96             :  *
      97             :  * The lock graph for the above example will look as follows:
      98             :  * LA (waiting to acquire the lock on the unique index) -> PA (waiting to
      99             :  * acquire the stream lock) -> LA
     100             :  *
     101             :  * This way, when PA is waiting for LA for the next stream of changes, we can
     102             :  * have a wait-edge from PA to LA in lmgr, which will make us detect the
     103             :  * deadlock between LA and PA.
     104             :  *
     105             :  * 2) Deadlock between the leader apply worker and parallel apply workers
     106             :  *
     107             :  * This scenario is similar to the first case but TX-1 and TX-2 are executed by
     108             :  * two parallel apply workers (PA-1 and PA-2 respectively). In this scenario,
     109             :  * PA-2 is waiting for PA-1 to complete its transaction while PA-1 is waiting
     110             :  * for subsequent input from LA. Also, LA is waiting for PA-2 to complete its
     111             :  * transaction in order to preserve the commit order. There is a deadlock among
     112             :  * the three processes.
     113             :  *
     114             :  * In order for lmgr to detect this, we have PA acquire a session lock (this is
     115             :  * a different lock than referred in the previous case, see
     116             :  * pa_lock_transaction()) on the transaction being applied and have LA wait on
     117             :  * the lock before proceeding in the transaction finish commands. Specifically,
     118             :  * PA will acquire this lock in AccessExclusive mode before executing the first
     119             :  * message of the transaction and release it at the xact end. LA will acquire
     120             :  * this lock in AccessShare mode at transaction finish commands and release it
     121             :  * immediately.
     122             :  *
     123             :  * The lock graph for the above example will look as follows:
     124             :  * LA (waiting to acquire the transaction lock) -> PA-2 (waiting to acquire the
     125             :  * lock due to unique index constraint) -> PA-1 (waiting to acquire the stream
     126             :  * lock) -> LA
     127             :  *
     128             :  * This way when LA is waiting to finish the transaction end command to preserve
     129             :  * the commit order, we will be able to detect deadlock, if any.
     130             :  *
     131             :  * One might think we can use XactLockTableWait(), but XactLockTableWait()
     132             :  * considers PREPARED TRANSACTION as still in progress which means the lock
     133             :  * won't be released even after the parallel apply worker has prepared the
     134             :  * transaction.
     135             :  *
     136             :  * 3) Deadlock when the shm_mq buffer is full
     137             :  *
     138             :  * In the previous scenario (ie. PA-1 and PA-2 are executing transactions
     139             :  * concurrently), if the shm_mq buffer between LA and PA-2 is full, LA has to
     140             :  * wait to send messages, and this wait doesn't appear in lmgr.
     141             :  *
     142             :  * To avoid this wait, we use a non-blocking write and wait with a timeout. If
     143             :  * the timeout is exceeded, the LA will serialize all the pending messages to
     144             :  * a file and indicate PA-2 that it needs to read that file for the remaining
     145             :  * messages. Then LA will start waiting for commit as in the previous case
     146             :  * which will detect deadlock if any. See pa_send_data() and
     147             :  * enum TransApplyAction.
     148             :  *
     149             :  * Lock types
     150             :  * ----------
     151             :  * Both the stream lock and the transaction lock mentioned above are
     152             :  * session-level locks because both locks could be acquired outside the
     153             :  * transaction, and the stream lock in the leader needs to persist across
     154             :  * transaction boundaries i.e. until the end of the streaming transaction.
     155             :  *-------------------------------------------------------------------------
     156             :  */
     157             : 
     158             : #include "postgres.h"
     159             : 
     160             : #include "libpq/pqformat.h"
     161             : #include "libpq/pqmq.h"
     162             : #include "pgstat.h"
     163             : #include "postmaster/interrupt.h"
     164             : #include "replication/logicallauncher.h"
     165             : #include "replication/logicalworker.h"
     166             : #include "replication/origin.h"
     167             : #include "replication/worker_internal.h"
     168             : #include "storage/ipc.h"
     169             : #include "storage/lmgr.h"
     170             : #include "tcop/tcopprot.h"
     171             : #include "utils/inval.h"
     172             : #include "utils/memutils.h"
     173             : #include "utils/syscache.h"
     174             : 
     175             : #define PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067
     176             : 
     177             : /*
     178             :  * DSM keys for parallel apply worker. Unlike other parallel execution code,
     179             :  * since we don't need to worry about DSM keys conflicting with plan_node_id we
     180             :  * can use small integers.
     181             :  */
     182             : #define PARALLEL_APPLY_KEY_SHARED       1
     183             : #define PARALLEL_APPLY_KEY_MQ           2
     184             : #define PARALLEL_APPLY_KEY_ERROR_QUEUE  3
     185             : 
     186             : /* Queue size of DSM, 16 MB for now. */
     187             : #define DSM_QUEUE_SIZE  (16 * 1024 * 1024)
     188             : 
     189             : /*
     190             :  * Error queue size of DSM. It is desirable to make it large enough that a
     191             :  * typical ErrorResponse can be sent without blocking. That way, a worker that
     192             :  * errors out can write the whole message into the queue and terminate without
     193             :  * waiting for the user backend.
     194             :  */
     195             : #define DSM_ERROR_QUEUE_SIZE            (16 * 1024)
     196             : 
     197             : /*
     198             :  * There are three fields in each message received by the parallel apply
     199             :  * worker: start_lsn, end_lsn and send_time. Because we have updated these
     200             :  * statistics in the leader apply worker, we can ignore these fields in the
     201             :  * parallel apply worker (see function LogicalRepApplyLoop).
     202             :  */
     203             : #define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
     204             : 
     205             : /*
     206             :  * The type of session-level lock on a transaction being applied on a logical
     207             :  * replication subscriber.
     208             :  */
     209             : #define PARALLEL_APPLY_LOCK_STREAM  0
     210             : #define PARALLEL_APPLY_LOCK_XACT    1
     211             : 
     212             : /*
     213             :  * Hash table entry to map xid to the parallel apply worker state.
     214             :  */
     215             : typedef struct ParallelApplyWorkerEntry
     216             : {
     217             :     TransactionId xid;          /* Hash key -- must be first */
     218             :     ParallelApplyWorkerInfo *winfo;
     219             : } ParallelApplyWorkerEntry;
     220             : 
     221             : /*
     222             :  * A hash table used to cache the state of streaming transactions being applied
     223             :  * by the parallel apply workers.
     224             :  */
     225             : static HTAB *ParallelApplyTxnHash = NULL;
     226             : 
     227             : /*
     228             : * A list (pool) of active parallel apply workers. The information for
     229             : * the new worker is added to the list after successfully launching it. The
     230             : * list entry is removed if there are already enough workers in the worker
     231             : * pool at the end of the transaction. For more information about the worker
     232             : * pool, see comments atop this file.
     233             :  */
     234             : static List *ParallelApplyWorkerPool = NIL;
     235             : 
     236             : /*
     237             :  * Information shared between leader apply worker and parallel apply worker.
     238             :  */
     239             : ParallelApplyWorkerShared *MyParallelShared = NULL;
     240             : 
     241             : /*
     242             :  * Is there a message sent by a parallel apply worker that the leader apply
     243             :  * worker needs to receive?
     244             :  */
     245             : volatile sig_atomic_t ParallelApplyMessagePending = false;
     246             : 
     247             : /*
     248             :  * Cache the parallel apply worker information required for applying the
     249             :  * current streaming transaction. It is used to save the cost of searching the
     250             :  * hash table when applying the changes between STREAM_START and STREAM_STOP.
     251             :  */
     252             : static ParallelApplyWorkerInfo *stream_apply_worker = NULL;
     253             : 
     254             : /* A list to maintain subtransactions, if any. */
     255             : static List *subxactlist = NIL;
     256             : 
     257             : static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo);
     258             : static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared);
     259             : static PartialFileSetState pa_get_fileset_state(void);
     260             : 
     261             : /*
     262             :  * Returns true if it is OK to start a parallel apply worker, false otherwise.
     263             :  */
     264             : static bool
     265         164 : pa_can_start(void)
     266             : {
     267             :     /* Only leader apply workers can start parallel apply workers. */
     268         164 :     if (!am_leader_apply_worker())
     269          54 :         return false;
     270             : 
     271             :     /*
     272             :      * It is good to check for any change in the subscription parameter to
     273             :      * avoid the case where for a very long time the change doesn't get
     274             :      * reflected. This can happen when there is a constant flow of streaming
     275             :      * transactions that are handled by parallel apply workers.
     276             :      *
     277             :      * It is better to do it before the below checks so that the latest values
     278             :      * of subscription can be used for the checks.
     279             :      */
     280         110 :     maybe_reread_subscription();
     281             : 
     282             :     /*
     283             :      * Don't start a new parallel apply worker if the subscription is not
     284             :      * using parallel streaming mode, or if the publisher does not support
     285             :      * parallel apply.
     286             :      */
     287         110 :     if (!MyLogicalRepWorker->parallel_apply)
     288          56 :         return false;
     289             : 
     290             :     /*
     291             :      * Don't start a new parallel worker if user has set skiplsn as it's
     292             :      * possible that they want to skip the streaming transaction. For
     293             :      * streaming transactions, we need to serialize the transaction to a file
     294             :      * so that we can get the last LSN of the transaction to judge whether to
     295             :      * skip before starting to apply the change.
     296             :      *
     297             :      * One might think that we could allow parallelism if the first lsn of the
     298             :      * transaction is greater than skiplsn, but we don't send it with the
     299             :      * STREAM START message, and it doesn't seem worth sending the extra eight
     300             :      * bytes with the STREAM START to enable parallelism for this case.
     301             :      */
     302          54 :     if (!XLogRecPtrIsInvalid(MySubscription->skiplsn))
     303           0 :         return false;
     304             : 
     305             :     /*
     306             :      * For streaming transactions that are being applied using a parallel
     307             :      * apply worker, we cannot decide whether to apply the change for a
     308             :      * relation that is not in the READY state (see
     309             :      * should_apply_changes_for_rel) as we won't know remote_final_lsn by that
     310             :      * time. So, we don't start the new parallel apply worker in this case.
     311             :      */
     312          54 :     if (!AllTablesyncsReady())
     313           0 :         return false;
     314             : 
     315          54 :     return true;
     316             : }
     317             : 
     318             : /*
     319             :  * Set up a dynamic shared memory segment.
     320             :  *
     321             :  * We set up a control region that contains a fixed-size worker info
     322             :  * (ParallelApplyWorkerShared), a message queue, and an error queue.
     323             :  *
     324             :  * Returns true on success, false on failure.
     325             :  */
     326             : static bool
     327          20 : pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
     328             : {
     329             :     shm_toc_estimator e;
     330             :     Size        segsize;
     331             :     dsm_segment *seg;
     332             :     shm_toc    *toc;
     333             :     ParallelApplyWorkerShared *shared;
     334             :     shm_mq     *mq;
     335          20 :     Size        queue_size = DSM_QUEUE_SIZE;
     336          20 :     Size        error_queue_size = DSM_ERROR_QUEUE_SIZE;
     337             : 
     338             :     /*
     339             :      * Estimate how much shared memory we need.
     340             :      *
     341             :      * Because the TOC machinery may choose to insert padding of oddly-sized
     342             :      * requests, we must estimate each chunk separately.
     343             :      *
     344             :      * We need one key to register the location of the header, and two other
     345             :      * keys to track the locations of the message queue and the error message
     346             :      * queue.
     347             :      */
     348          20 :     shm_toc_initialize_estimator(&e);
     349          20 :     shm_toc_estimate_chunk(&e, sizeof(ParallelApplyWorkerShared));
     350          20 :     shm_toc_estimate_chunk(&e, queue_size);
     351          20 :     shm_toc_estimate_chunk(&e, error_queue_size);
     352             : 
     353          20 :     shm_toc_estimate_keys(&e, 3);
     354          20 :     segsize = shm_toc_estimate(&e);
     355             : 
     356             :     /* Create the shared memory segment and establish a table of contents. */
     357          20 :     seg = dsm_create(shm_toc_estimate(&e), 0);
     358          20 :     if (!seg)
     359           0 :         return false;
     360             : 
     361          20 :     toc = shm_toc_create(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg),
     362             :                          segsize);
     363             : 
     364             :     /* Set up the header region. */
     365          20 :     shared = shm_toc_allocate(toc, sizeof(ParallelApplyWorkerShared));
     366          20 :     SpinLockInit(&shared->mutex);
     367             : 
     368          20 :     shared->xact_state = PARALLEL_TRANS_UNKNOWN;
     369          20 :     pg_atomic_init_u32(&(shared->pending_stream_count), 0);
     370          20 :     shared->last_commit_end = InvalidXLogRecPtr;
     371          20 :     shared->fileset_state = FS_EMPTY;
     372             : 
     373          20 :     shm_toc_insert(toc, PARALLEL_APPLY_KEY_SHARED, shared);
     374             : 
     375             :     /* Set up message queue for the worker. */
     376          20 :     mq = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
     377          20 :     shm_toc_insert(toc, PARALLEL_APPLY_KEY_MQ, mq);
     378          20 :     shm_mq_set_sender(mq, MyProc);
     379             : 
     380             :     /* Attach the queue. */
     381          20 :     winfo->mq_handle = shm_mq_attach(mq, seg, NULL);
     382             : 
     383             :     /* Set up error queue for the worker. */
     384          20 :     mq = shm_mq_create(shm_toc_allocate(toc, error_queue_size),
     385             :                        error_queue_size);
     386          20 :     shm_toc_insert(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, mq);
     387          20 :     shm_mq_set_receiver(mq, MyProc);
     388             : 
     389             :     /* Attach the queue. */
     390          20 :     winfo->error_mq_handle = shm_mq_attach(mq, seg, NULL);
     391             : 
     392             :     /* Return results to caller. */
     393          20 :     winfo->dsm_seg = seg;
     394          20 :     winfo->shared = shared;
     395             : 
     396          20 :     return true;
     397             : }
     398             : 
     399             : /*
     400             :  * Try to get a parallel apply worker from the pool. If none is available then
     401             :  * start a new one.
     402             :  */
     403             : static ParallelApplyWorkerInfo *
     404          54 : pa_launch_parallel_worker(void)
     405             : {
     406             :     MemoryContext oldcontext;
     407             :     bool        launched;
     408             :     ParallelApplyWorkerInfo *winfo;
     409             :     ListCell   *lc;
     410             : 
     411             :     /* Try to get an available parallel apply worker from the worker pool. */
     412          58 :     foreach(lc, ParallelApplyWorkerPool)
     413             :     {
     414          38 :         winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
     415             : 
     416          38 :         if (!winfo->in_use)
     417          34 :             return winfo;
     418             :     }
     419             : 
     420             :     /*
     421             :      * Start a new parallel apply worker.
     422             :      *
     423             :      * The worker info can be used for the lifetime of the worker process, so
     424             :      * create it in a permanent context.
     425             :      */
     426          20 :     oldcontext = MemoryContextSwitchTo(ApplyContext);
     427             : 
     428          20 :     winfo = (ParallelApplyWorkerInfo *) palloc0(sizeof(ParallelApplyWorkerInfo));
     429             : 
     430             :     /* Setup shared memory. */
     431          20 :     if (!pa_setup_dsm(winfo))
     432             :     {
     433           0 :         MemoryContextSwitchTo(oldcontext);
     434           0 :         pfree(winfo);
     435           0 :         return NULL;
     436             :     }
     437             : 
     438          20 :     launched = logicalrep_worker_launch(WORKERTYPE_PARALLEL_APPLY,
     439          20 :                                         MyLogicalRepWorker->dbid,
     440          20 :                                         MySubscription->oid,
     441          20 :                                         MySubscription->name,
     442          20 :                                         MyLogicalRepWorker->userid,
     443             :                                         InvalidOid,
     444             :                                         dsm_segment_handle(winfo->dsm_seg),
     445             :                                         false);
     446             : 
     447          20 :     if (launched)
     448             :     {
     449          20 :         ParallelApplyWorkerPool = lappend(ParallelApplyWorkerPool, winfo);
     450             :     }
     451             :     else
     452             :     {
     453           0 :         pa_free_worker_info(winfo);
     454           0 :         winfo = NULL;
     455             :     }
     456             : 
     457          20 :     MemoryContextSwitchTo(oldcontext);
     458             : 
     459          20 :     return winfo;
     460             : }
     461             : 
     462             : /*
     463             :  * Allocate a parallel apply worker that will be used for the specified xid.
     464             :  *
     465             :  * We first try to get an available worker from the pool, if any and then try
     466             :  * to launch a new worker. On successful allocation, remember the worker
     467             :  * information in the hash table so that we can get it later for processing the
     468             :  * streaming changes.
     469             :  */
     470             : void
     471         164 : pa_allocate_worker(TransactionId xid)
     472             : {
     473             :     bool        found;
     474         164 :     ParallelApplyWorkerInfo *winfo = NULL;
     475             :     ParallelApplyWorkerEntry *entry;
     476             : 
     477         164 :     if (!pa_can_start())
     478         110 :         return;
     479             : 
     480          54 :     winfo = pa_launch_parallel_worker();
     481          54 :     if (!winfo)
     482           0 :         return;
     483             : 
     484             :     /* First time through, initialize parallel apply worker state hashtable. */
     485          54 :     if (!ParallelApplyTxnHash)
     486             :     {
     487             :         HASHCTL     ctl;
     488             : 
     489         182 :         MemSet(&ctl, 0, sizeof(ctl));
     490          14 :         ctl.keysize = sizeof(TransactionId);
     491          14 :         ctl.entrysize = sizeof(ParallelApplyWorkerEntry);
     492          14 :         ctl.hcxt = ApplyContext;
     493             : 
     494          14 :         ParallelApplyTxnHash = hash_create("logical replication parallel apply workers hash",
     495             :                                            16, &ctl,
     496             :                                            HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
     497             :     }
     498             : 
     499             :     /* Create an entry for the requested transaction. */
     500          54 :     entry = hash_search(ParallelApplyTxnHash, &xid, HASH_ENTER, &found);
     501          54 :     if (found)
     502           0 :         elog(ERROR, "hash table corrupted");
     503             : 
     504             :     /* Update the transaction information in shared memory. */
     505          54 :     SpinLockAcquire(&winfo->shared->mutex);
     506          54 :     winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN;
     507          54 :     winfo->shared->xid = xid;
     508          54 :     SpinLockRelease(&winfo->shared->mutex);
     509             : 
     510          54 :     winfo->in_use = true;
     511          54 :     winfo->serialize_changes = false;
     512          54 :     entry->winfo = winfo;
     513             : }
     514             : 
     515             : /*
     516             :  * Find the assigned worker for the given transaction, if any.
     517             :  */
     518             : ParallelApplyWorkerInfo *
     519      520652 : pa_find_worker(TransactionId xid)
     520             : {
     521             :     bool        found;
     522             :     ParallelApplyWorkerEntry *entry;
     523             : 
     524      520652 :     if (!TransactionIdIsValid(xid))
     525      166278 :         return NULL;
     526             : 
     527      354374 :     if (!ParallelApplyTxnHash)
     528      206476 :         return NULL;
     529             : 
     530             :     /* Return the cached parallel apply worker if valid. */
     531      147898 :     if (stream_apply_worker)
     532      147306 :         return stream_apply_worker;
     533             : 
     534             :     /* Find an entry for the requested transaction. */
     535         592 :     entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
     536         592 :     if (found)
     537             :     {
     538             :         /* The worker must not have exited.  */
     539             :         Assert(entry->winfo->in_use);
     540         592 :         return entry->winfo;
     541             :     }
     542             : 
     543           0 :     return NULL;
     544             : }
     545             : 
     546             : /*
     547             :  * Makes the worker available for reuse.
     548             :  *
     549             :  * This removes the parallel apply worker entry from the hash table so that it
     550             :  * can't be used. If there are enough workers in the pool, it stops the worker
     551             :  * and frees the corresponding info. Otherwise it just marks the worker as
     552             :  * available for reuse.
     553             :  *
     554             :  * For more information about the worker pool, see comments atop this file.
     555             :  */
     556             : static void
     557          48 : pa_free_worker(ParallelApplyWorkerInfo *winfo)
     558             : {
     559             :     Assert(!am_parallel_apply_worker());
     560             :     Assert(winfo->in_use);
     561             :     Assert(pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED);
     562             : 
     563          48 :     if (!hash_search(ParallelApplyTxnHash, &winfo->shared->xid, HASH_REMOVE, NULL))
     564           0 :         elog(ERROR, "hash table corrupted");
     565             : 
     566             :     /*
     567             :      * Stop the worker if there are enough workers in the pool.
     568             :      *
     569             :      * XXX Additionally, we also stop the worker if the leader apply worker
     570             :      * serialize part of the transaction data due to a send timeout. This is
     571             :      * because the message could be partially written to the queue and there
     572             :      * is no way to clean the queue other than resending the message until it
     573             :      * succeeds. Instead of trying to send the data which anyway would have
     574             :      * been serialized and then letting the parallel apply worker deal with
     575             :      * the spurious message, we stop the worker.
     576             :      */
     577          48 :     if (winfo->serialize_changes ||
     578          40 :         list_length(ParallelApplyWorkerPool) >
     579          40 :         (max_parallel_apply_workers_per_subscription / 2))
     580             :     {
     581          10 :         logicalrep_pa_worker_stop(winfo);
     582          10 :         pa_free_worker_info(winfo);
     583             : 
     584          10 :         return;
     585             :     }
     586             : 
     587          38 :     winfo->in_use = false;
     588          38 :     winfo->serialize_changes = false;
     589             : }
     590             : 
     591             : /*
     592             :  * Free the parallel apply worker information and unlink the files with
     593             :  * serialized changes if any.
     594             :  */
     595             : static void
     596          10 : pa_free_worker_info(ParallelApplyWorkerInfo *winfo)
     597             : {
     598             :     Assert(winfo);
     599             : 
     600          10 :     if (winfo->mq_handle)
     601          10 :         shm_mq_detach(winfo->mq_handle);
     602             : 
     603          10 :     if (winfo->error_mq_handle)
     604           0 :         shm_mq_detach(winfo->error_mq_handle);
     605             : 
     606             :     /* Unlink the files with serialized changes. */
     607          10 :     if (winfo->serialize_changes)
     608           8 :         stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid);
     609             : 
     610          10 :     if (winfo->dsm_seg)
     611          10 :         dsm_detach(winfo->dsm_seg);
     612             : 
     613             :     /* Remove from the worker pool. */
     614          10 :     ParallelApplyWorkerPool = list_delete_ptr(ParallelApplyWorkerPool, winfo);
     615             : 
     616          10 :     pfree(winfo);
     617          10 : }
     618             : 
     619             : /*
     620             :  * Detach the error queue for all parallel apply workers.
     621             :  */
     622             : void
     623         514 : pa_detach_all_error_mq(void)
     624             : {
     625             :     ListCell   *lc;
     626             : 
     627         524 :     foreach(lc, ParallelApplyWorkerPool)
     628             :     {
     629          10 :         ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
     630             : 
     631          10 :         if (winfo->error_mq_handle)
     632             :         {
     633          10 :             shm_mq_detach(winfo->error_mq_handle);
     634          10 :             winfo->error_mq_handle = NULL;
     635             :         }
     636             :     }
     637         514 : }
     638             : 
     639             : /*
     640             :  * Check if there are any pending spooled messages.
     641             :  */
     642             : static bool
     643          32 : pa_has_spooled_message_pending()
     644             : {
     645             :     PartialFileSetState fileset_state;
     646             : 
     647          32 :     fileset_state = pa_get_fileset_state();
     648             : 
     649          32 :     return (fileset_state != FS_EMPTY);
     650             : }
     651             : 
     652             : /*
     653             :  * Replay the spooled messages once the leader apply worker has finished
     654             :  * serializing changes to the file.
     655             :  *
     656             :  * Returns false if there aren't any pending spooled messages, true otherwise.
     657             :  */
     658             : static bool
     659         110 : pa_process_spooled_messages_if_required(void)
     660             : {
     661             :     PartialFileSetState fileset_state;
     662             : 
     663         110 :     fileset_state = pa_get_fileset_state();
     664             : 
     665         110 :     if (fileset_state == FS_EMPTY)
     666          94 :         return false;
     667             : 
     668             :     /*
     669             :      * If the leader apply worker is busy serializing the partial changes then
     670             :      * acquire the stream lock now and wait for the leader worker to finish
     671             :      * serializing the changes. Otherwise, the parallel apply worker won't get
     672             :      * a chance to receive a STREAM_STOP (and acquire the stream lock) until
     673             :      * the leader had serialized all changes which can lead to undetected
     674             :      * deadlock.
     675             :      *
     676             :      * Note that the fileset state can be FS_SERIALIZE_DONE once the leader
     677             :      * worker has finished serializing the changes.
     678             :      */
     679          16 :     if (fileset_state == FS_SERIALIZE_IN_PROGRESS)
     680             :     {
     681           0 :         pa_lock_stream(MyParallelShared->xid, AccessShareLock);
     682           0 :         pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
     683             : 
     684           0 :         fileset_state = pa_get_fileset_state();
     685             :     }
     686             : 
     687             :     /*
     688             :      * We cannot read the file immediately after the leader has serialized all
     689             :      * changes to the file because there may still be messages in the memory
     690             :      * queue. We will apply all spooled messages the next time we call this
     691             :      * function and that will ensure there are no messages left in the memory
     692             :      * queue.
     693             :      */
     694          16 :     if (fileset_state == FS_SERIALIZE_DONE)
     695             :     {
     696           8 :         pa_set_fileset_state(MyParallelShared, FS_READY);
     697             :     }
     698           8 :     else if (fileset_state == FS_READY)
     699             :     {
     700           8 :         apply_spooled_messages(&MyParallelShared->fileset,
     701           8 :                                MyParallelShared->xid,
     702             :                                InvalidXLogRecPtr);
     703           8 :         pa_set_fileset_state(MyParallelShared, FS_EMPTY);
     704             :     }
     705             : 
     706          16 :     return true;
     707             : }
     708             : 
     709             : /*
     710             :  * Interrupt handler for main loop of parallel apply worker.
     711             :  */
     712             : static void
     713      127902 : ProcessParallelApplyInterrupts(void)
     714             : {
     715      127902 :     CHECK_FOR_INTERRUPTS();
     716             : 
     717      127896 :     if (ShutdownRequestPending)
     718             :     {
     719          10 :         ereport(LOG,
     720             :                 (errmsg("logical replication parallel apply worker for subscription \"%s\" has finished",
     721             :                         MySubscription->name)));
     722             : 
     723          10 :         proc_exit(0);
     724             :     }
     725             : 
     726      127886 :     if (ConfigReloadPending)
     727             :     {
     728           8 :         ConfigReloadPending = false;
     729           8 :         ProcessConfigFile(PGC_SIGHUP);
     730             :     }
     731      127886 : }
     732             : 
     733             : /* Parallel apply worker main loop. */
     734             : static void
     735          20 : LogicalParallelApplyLoop(shm_mq_handle *mqh)
     736             : {
     737             :     shm_mq_result shmq_res;
     738             :     ErrorContextCallback errcallback;
     739          20 :     MemoryContext oldcxt = CurrentMemoryContext;
     740             : 
     741             :     /*
     742             :      * Init the ApplyMessageContext which we clean up after each replication
     743             :      * protocol message.
     744             :      */
     745          20 :     ApplyMessageContext = AllocSetContextCreate(ApplyContext,
     746             :                                                 "ApplyMessageContext",
     747             :                                                 ALLOCSET_DEFAULT_SIZES);
     748             : 
     749             :     /*
     750             :      * Push apply error context callback. Fields will be filled while applying
     751             :      * a change.
     752             :      */
     753          20 :     errcallback.callback = apply_error_callback;
     754          20 :     errcallback.previous = error_context_stack;
     755          20 :     error_context_stack = &errcallback;
     756             : 
     757             :     for (;;)
     758      127882 :     {
     759             :         void       *data;
     760             :         Size        len;
     761             : 
     762      127902 :         ProcessParallelApplyInterrupts();
     763             : 
     764             :         /* Ensure we are reading the data into our memory context. */
     765      127886 :         MemoryContextSwitchTo(ApplyMessageContext);
     766             : 
     767      127886 :         shmq_res = shm_mq_receive(mqh, &len, &data, true);
     768             : 
     769      127886 :         if (shmq_res == SHM_MQ_SUCCESS)
     770             :         {
     771             :             StringInfoData s;
     772             :             int         c;
     773             : 
     774      127776 :             if (len == 0)
     775           0 :                 elog(ERROR, "invalid message length");
     776             : 
     777      127776 :             initReadOnlyStringInfo(&s, data, len);
     778             : 
     779             :             /*
     780             :              * The first byte of messages sent from leader apply worker to
     781             :              * parallel apply workers can only be 'w'.
     782             :              */
     783      127776 :             c = pq_getmsgbyte(&s);
     784      127776 :             if (c != 'w')
     785           0 :                 elog(ERROR, "unexpected message \"%c\"", c);
     786             : 
     787             :             /*
     788             :              * Ignore statistics fields that have been updated by the leader
     789             :              * apply worker.
     790             :              *
     791             :              * XXX We can avoid sending the statistics fields from the leader
     792             :              * apply worker but for that, it needs to rebuild the entire
     793             :              * message by removing these fields which could be more work than
     794             :              * simply ignoring these fields in the parallel apply worker.
     795             :              */
     796      127776 :             s.cursor += SIZE_STATS_MESSAGE;
     797             : 
     798      127776 :             apply_dispatch(&s);
     799             :         }
     800         110 :         else if (shmq_res == SHM_MQ_WOULD_BLOCK)
     801             :         {
     802             :             /* Replay the changes from the file, if any. */
     803         110 :             if (!pa_process_spooled_messages_if_required())
     804             :             {
     805             :                 int         rc;
     806             : 
     807             :                 /* Wait for more work. */
     808          94 :                 rc = WaitLatch(MyLatch,
     809             :                                WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     810             :                                1000L,
     811             :                                WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN);
     812             : 
     813          94 :                 if (rc & WL_LATCH_SET)
     814          84 :                     ResetLatch(MyLatch);
     815             :             }
     816             :         }
     817             :         else
     818             :         {
     819             :             Assert(shmq_res == SHM_MQ_DETACHED);
     820             : 
     821           0 :             ereport(ERROR,
     822             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     823             :                      errmsg("lost connection to the logical replication apply worker")));
     824             :         }
     825             : 
     826      127882 :         MemoryContextReset(ApplyMessageContext);
     827      127882 :         MemoryContextSwitchTo(oldcxt);
     828             :     }
     829             : 
     830             :     /* Pop the error context stack. */
     831             :     error_context_stack = errcallback.previous;
     832             : 
     833             :     MemoryContextSwitchTo(oldcxt);
     834             : }
     835             : 
     836             : /*
     837             :  * Make sure the leader apply worker tries to read from our error queue one more
     838             :  * time. This guards against the case where we exit uncleanly without sending
     839             :  * an ErrorResponse, for example because some code calls proc_exit directly.
     840             :  *
     841             :  * Also explicitly detach from dsm segment to invoke on_dsm_detach callbacks,
     842             :  * if any. See ParallelWorkerShutdown for details.
     843             :  */
     844             : static void
     845          20 : pa_shutdown(int code, Datum arg)
     846             : {
     847          20 :     SendProcSignal(MyLogicalRepWorker->leader_pid,
     848             :                    PROCSIG_PARALLEL_APPLY_MESSAGE,
     849             :                    INVALID_PROC_NUMBER);
     850             : 
     851          20 :     dsm_detach((dsm_segment *) DatumGetPointer(arg));
     852          20 : }
     853             : 
     854             : /*
     855             :  * Parallel apply worker entry point.
     856             :  */
     857             : void
     858          20 : ParallelApplyWorkerMain(Datum main_arg)
     859             : {
     860             :     ParallelApplyWorkerShared *shared;
     861             :     dsm_handle  handle;
     862             :     dsm_segment *seg;
     863             :     shm_toc    *toc;
     864             :     shm_mq     *mq;
     865             :     shm_mq_handle *mqh;
     866             :     shm_mq_handle *error_mqh;
     867             :     RepOriginId originid;
     868          20 :     int         worker_slot = DatumGetInt32(main_arg);
     869             :     char        originname[NAMEDATALEN];
     870             : 
     871          20 :     InitializingApplyWorker = true;
     872             : 
     873             :     /* Setup signal handling. */
     874          20 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
     875          20 :     pqsignal(SIGINT, SignalHandlerForShutdownRequest);
     876          20 :     pqsignal(SIGTERM, die);
     877          20 :     BackgroundWorkerUnblockSignals();
     878             : 
     879             :     /*
     880             :      * Attach to the dynamic shared memory segment for the parallel apply, and
     881             :      * find its table of contents.
     882             :      *
     883             :      * Like parallel query, we don't need resource owner by this time. See
     884             :      * ParallelWorkerMain.
     885             :      */
     886          20 :     memcpy(&handle, MyBgworkerEntry->bgw_extra, sizeof(dsm_handle));
     887          20 :     seg = dsm_attach(handle);
     888          20 :     if (!seg)
     889           0 :         ereport(ERROR,
     890             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     891             :                  errmsg("could not map dynamic shared memory segment")));
     892             : 
     893          20 :     toc = shm_toc_attach(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg));
     894          20 :     if (!toc)
     895           0 :         ereport(ERROR,
     896             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     897             :                  errmsg("invalid magic number in dynamic shared memory segment")));
     898             : 
     899             :     /* Look up the shared information. */
     900          20 :     shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false);
     901          20 :     MyParallelShared = shared;
     902             : 
     903             :     /*
     904             :      * Attach to the message queue.
     905             :      */
     906          20 :     mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_MQ, false);
     907          20 :     shm_mq_set_receiver(mq, MyProc);
     908          20 :     mqh = shm_mq_attach(mq, seg, NULL);
     909             : 
     910             :     /*
     911             :      * Primary initialization is complete. Now, we can attach to our slot.
     912             :      * This is to ensure that the leader apply worker does not write data to
     913             :      * the uninitialized memory queue.
     914             :      */
     915          20 :     logicalrep_worker_attach(worker_slot);
     916             : 
     917             :     /*
     918             :      * Register the shutdown callback after we are attached to the worker
     919             :      * slot. This is to ensure that MyLogicalRepWorker remains valid when this
     920             :      * callback is invoked.
     921             :      */
     922          20 :     before_shmem_exit(pa_shutdown, PointerGetDatum(seg));
     923             : 
     924          20 :     SpinLockAcquire(&MyParallelShared->mutex);
     925          20 :     MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation;
     926          20 :     MyParallelShared->logicalrep_worker_slot_no = worker_slot;
     927          20 :     SpinLockRelease(&MyParallelShared->mutex);
     928             : 
     929             :     /*
     930             :      * Attach to the error queue.
     931             :      */
     932          20 :     mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, false);
     933          20 :     shm_mq_set_sender(mq, MyProc);
     934          20 :     error_mqh = shm_mq_attach(mq, seg, NULL);
     935             : 
     936          20 :     pq_redirect_to_shm_mq(seg, error_mqh);
     937          20 :     pq_set_parallel_leader(MyLogicalRepWorker->leader_pid,
     938             :                            INVALID_PROC_NUMBER);
     939             : 
     940          20 :     MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
     941          20 :         MyLogicalRepWorker->reply_time = 0;
     942             : 
     943          20 :     InitializeLogRepWorker();
     944             : 
     945          20 :     InitializingApplyWorker = false;
     946             : 
     947             :     /* Setup replication origin tracking. */
     948          20 :     StartTransactionCommand();
     949          20 :     ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
     950             :                                        originname, sizeof(originname));
     951          20 :     originid = replorigin_by_name(originname, false);
     952             : 
     953             :     /*
     954             :      * The parallel apply worker doesn't need to monopolize this replication
     955             :      * origin which was already acquired by its leader process.
     956             :      */
     957          20 :     replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
     958          20 :     replorigin_session_origin = originid;
     959          20 :     CommitTransactionCommand();
     960             : 
     961             :     /*
     962             :      * Setup callback for syscache so that we know when something changes in
     963             :      * the subscription relation state.
     964             :      */
     965          20 :     CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
     966             :                                   invalidate_syncing_table_states,
     967             :                                   (Datum) 0);
     968             : 
     969          20 :     set_apply_error_context_origin(originname);
     970             : 
     971          20 :     LogicalParallelApplyLoop(mqh);
     972             : 
     973             :     /*
     974             :      * The parallel apply worker must not get here because the parallel apply
     975             :      * worker will only stop when it receives a SIGTERM or SIGINT from the
     976             :      * leader, or when there is an error. None of these cases will allow the
     977             :      * code to reach here.
     978             :      */
     979             :     Assert(false);
     980           0 : }
     981             : 
     982             : /*
     983             :  * Handle receipt of an interrupt indicating a parallel apply worker message.
     984             :  *
     985             :  * Note: this is called within a signal handler! All we can do is set a flag
     986             :  * that will cause the next CHECK_FOR_INTERRUPTS() to invoke
     987             :  * ProcessParallelApplyMessages().
     988             :  */
     989             : void
     990          22 : HandleParallelApplyMessageInterrupt(void)
     991             : {
     992          22 :     InterruptPending = true;
     993          22 :     ParallelApplyMessagePending = true;
     994          22 :     SetLatch(MyLatch);
     995          22 : }
     996             : 
     997             : /*
     998             :  * Process a single protocol message received from a single parallel apply
     999             :  * worker.
    1000             :  */
    1001             : static void
    1002           2 : ProcessParallelApplyMessage(StringInfo msg)
    1003             : {
    1004             :     char        msgtype;
    1005             : 
    1006           2 :     msgtype = pq_getmsgbyte(msg);
    1007             : 
    1008           2 :     switch (msgtype)
    1009             :     {
    1010           2 :         case 'E':               /* ErrorResponse */
    1011             :             {
    1012             :                 ErrorData   edata;
    1013             : 
    1014             :                 /* Parse ErrorResponse. */
    1015           2 :                 pq_parse_errornotice(msg, &edata);
    1016             : 
    1017             :                 /*
    1018             :                  * If desired, add a context line to show that this is a
    1019             :                  * message propagated from a parallel apply worker. Otherwise,
    1020             :                  * it can sometimes be confusing to understand what actually
    1021             :                  * happened.
    1022             :                  */
    1023           2 :                 if (edata.context)
    1024           2 :                     edata.context = psprintf("%s\n%s", edata.context,
    1025             :                                              _("logical replication parallel apply worker"));
    1026             :                 else
    1027           0 :                     edata.context = pstrdup(_("logical replication parallel apply worker"));
    1028             : 
    1029             :                 /*
    1030             :                  * Context beyond that should use the error context callbacks
    1031             :                  * that were in effect in LogicalRepApplyLoop().
    1032             :                  */
    1033           2 :                 error_context_stack = apply_error_context_stack;
    1034             : 
    1035             :                 /*
    1036             :                  * The actual error must have been reported by the parallel
    1037             :                  * apply worker.
    1038             :                  */
    1039           2 :                 ereport(ERROR,
    1040             :                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1041             :                          errmsg("logical replication parallel apply worker exited due to error"),
    1042             :                          errcontext("%s", edata.context)));
    1043             :             }
    1044             : 
    1045             :             /*
    1046             :              * Don't need to do anything about NoticeResponse and
    1047             :              * NotifyResponse as the logical replication worker doesn't need
    1048             :              * to send messages to the client.
    1049             :              */
    1050           0 :         case 'N':
    1051             :         case 'A':
    1052           0 :             break;
    1053             : 
    1054           0 :         default:
    1055           0 :             elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",
    1056             :                  msgtype, msg->len);
    1057             :     }
    1058           0 : }
    1059             : 
    1060             : /*
    1061             :  * Handle any queued protocol messages received from parallel apply workers.
    1062             :  */
    1063             : void
    1064          12 : ProcessParallelApplyMessages(void)
    1065             : {
    1066             :     ListCell   *lc;
    1067             :     MemoryContext oldcontext;
    1068             : 
    1069             :     static MemoryContext hpam_context = NULL;
    1070             : 
    1071             :     /*
    1072             :      * This is invoked from ProcessInterrupts(), and since some of the
    1073             :      * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
    1074             :      * for recursive calls if more signals are received while this runs. It's
    1075             :      * unclear that recursive entry would be safe, and it doesn't seem useful
    1076             :      * even if it is safe, so let's block interrupts until done.
    1077             :      */
    1078          12 :     HOLD_INTERRUPTS();
    1079             : 
    1080             :     /*
    1081             :      * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
    1082             :      * don't want to risk leaking data into long-lived contexts, so let's do
    1083             :      * our work here in a private context that we can reset on each use.
    1084             :      */
    1085          12 :     if (!hpam_context)          /* first time through? */
    1086          10 :         hpam_context = AllocSetContextCreate(TopMemoryContext,
    1087             :                                              "ProcessParallelApplyMessages",
    1088             :                                              ALLOCSET_DEFAULT_SIZES);
    1089             :     else
    1090           2 :         MemoryContextReset(hpam_context);
    1091             : 
    1092          12 :     oldcontext = MemoryContextSwitchTo(hpam_context);
    1093             : 
    1094          12 :     ParallelApplyMessagePending = false;
    1095             : 
    1096          24 :     foreach(lc, ParallelApplyWorkerPool)
    1097             :     {
    1098             :         shm_mq_result res;
    1099             :         Size        nbytes;
    1100             :         void       *data;
    1101          14 :         ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
    1102             : 
    1103             :         /*
    1104             :          * The leader will detach from the error queue and set it to NULL
    1105             :          * before preparing to stop all parallel apply workers, so we don't
    1106             :          * need to handle error messages anymore. See
    1107             :          * logicalrep_worker_detach.
    1108             :          */
    1109          14 :         if (!winfo->error_mq_handle)
    1110          12 :             continue;
    1111             : 
    1112           4 :         res = shm_mq_receive(winfo->error_mq_handle, &nbytes, &data, true);
    1113             : 
    1114           4 :         if (res == SHM_MQ_WOULD_BLOCK)
    1115           2 :             continue;
    1116           2 :         else if (res == SHM_MQ_SUCCESS)
    1117             :         {
    1118             :             StringInfoData msg;
    1119             : 
    1120           2 :             initStringInfo(&msg);
    1121           2 :             appendBinaryStringInfo(&msg, data, nbytes);
    1122           2 :             ProcessParallelApplyMessage(&msg);
    1123           0 :             pfree(msg.data);
    1124             :         }
    1125             :         else
    1126           0 :             ereport(ERROR,
    1127             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1128             :                      errmsg("lost connection to the logical replication parallel apply worker")));
    1129             :     }
    1130             : 
    1131          10 :     MemoryContextSwitchTo(oldcontext);
    1132             : 
    1133             :     /* Might as well clear the context on our way out */
    1134          10 :     MemoryContextReset(hpam_context);
    1135             : 
    1136          10 :     RESUME_INTERRUPTS();
    1137          10 : }
    1138             : 
    1139             : /*
    1140             :  * Send the data to the specified parallel apply worker via shared-memory
    1141             :  * queue.
    1142             :  *
    1143             :  * Returns false if the attempt to send data via shared memory times out, true
    1144             :  * otherwise.
    1145             :  */
    1146             : bool
    1147      137824 : pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
    1148             : {
    1149             :     int         rc;
    1150             :     shm_mq_result result;
    1151      137824 :     TimestampTz startTime = 0;
    1152             : 
    1153             :     Assert(!IsTransactionState());
    1154             :     Assert(!winfo->serialize_changes);
    1155             : 
    1156             :     /*
    1157             :      * We don't try to send data to parallel worker for 'immediate' mode. This
    1158             :      * is primarily used for testing purposes.
    1159             :      */
    1160      137824 :     if (unlikely(debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE))
    1161           8 :         return false;
    1162             : 
    1163             : /*
    1164             :  * This timeout is a bit arbitrary but testing revealed that it is sufficient
    1165             :  * to send the message unless the parallel apply worker is waiting on some
    1166             :  * lock or there is a serious resource crunch. See the comments atop this file
    1167             :  * to know why we are using a non-blocking way to send the message.
    1168             :  */
    1169             : #define SHM_SEND_RETRY_INTERVAL_MS 1000
    1170             : #define SHM_SEND_TIMEOUT_MS     (10000 - SHM_SEND_RETRY_INTERVAL_MS)
    1171             : 
    1172             :     for (;;)
    1173             :     {
    1174      137816 :         result = shm_mq_send(winfo->mq_handle, nbytes, data, true, true);
    1175             : 
    1176      137816 :         if (result == SHM_MQ_SUCCESS)
    1177      137816 :             return true;
    1178           0 :         else if (result == SHM_MQ_DETACHED)
    1179           0 :             ereport(ERROR,
    1180             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1181             :                      errmsg("could not send data to shared-memory queue")));
    1182             : 
    1183             :         Assert(result == SHM_MQ_WOULD_BLOCK);
    1184             : 
    1185             :         /* Wait before retrying. */
    1186           0 :         rc = WaitLatch(MyLatch,
    1187             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1188             :                        SHM_SEND_RETRY_INTERVAL_MS,
    1189             :                        WAIT_EVENT_LOGICAL_APPLY_SEND_DATA);
    1190             : 
    1191           0 :         if (rc & WL_LATCH_SET)
    1192             :         {
    1193           0 :             ResetLatch(MyLatch);
    1194           0 :             CHECK_FOR_INTERRUPTS();
    1195             :         }
    1196             : 
    1197           0 :         if (startTime == 0)
    1198           0 :             startTime = GetCurrentTimestamp();
    1199           0 :         else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
    1200             :                                             SHM_SEND_TIMEOUT_MS))
    1201           0 :             return false;
    1202             :     }
    1203             : }
    1204             : 
    1205             : /*
    1206             :  * Switch to PARTIAL_SERIALIZE mode for the current transaction -- this means
    1207             :  * that the current data and any subsequent data for this transaction will be
    1208             :  * serialized to a file. This is done to prevent possible deadlocks with
    1209             :  * another parallel apply worker (refer to the comments atop this file).
    1210             :  */
    1211             : void
    1212           8 : pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
    1213             :                                bool stream_locked)
    1214             : {
    1215           8 :     ereport(LOG,
    1216             :             (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
    1217             :                     winfo->shared->xid)));
    1218             : 
    1219             :     /*
    1220             :      * The parallel apply worker could be stuck for some reason (say waiting
    1221             :      * on some lock by other backend), so stop trying to send data directly to
    1222             :      * it and start serializing data to the file instead.
    1223             :      */
    1224           8 :     winfo->serialize_changes = true;
    1225             : 
    1226             :     /* Initialize the stream fileset. */
    1227           8 :     stream_start_internal(winfo->shared->xid, true);
    1228             : 
    1229             :     /*
    1230             :      * Acquires the stream lock if not already to make sure that the parallel
    1231             :      * apply worker will wait for the leader to release the stream lock until
    1232             :      * the end of the transaction.
    1233             :      */
    1234           8 :     if (!stream_locked)
    1235           8 :         pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
    1236             : 
    1237           8 :     pa_set_fileset_state(winfo->shared, FS_SERIALIZE_IN_PROGRESS);
    1238           8 : }
    1239             : 
    1240             : /*
    1241             :  * Wait until the parallel apply worker's transaction state has reached or
    1242             :  * exceeded the given xact_state.
    1243             :  */
    1244             : static void
    1245          50 : pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo,
    1246             :                        ParallelTransState xact_state)
    1247             : {
    1248             :     for (;;)
    1249             :     {
    1250             :         /*
    1251             :          * Stop if the transaction state has reached or exceeded the given
    1252             :          * xact_state.
    1253             :          */
    1254         512 :         if (pa_get_xact_state(winfo->shared) >= xact_state)
    1255          50 :             break;
    1256             : 
    1257             :         /* Wait to be signalled. */
    1258         462 :         (void) WaitLatch(MyLatch,
    1259             :                          WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1260             :                          10L,
    1261             :                          WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);
    1262             : 
    1263             :         /* Reset the latch so we don't spin. */
    1264         462 :         ResetLatch(MyLatch);
    1265             : 
    1266             :         /* An interrupt may have occurred while we were waiting. */
    1267         462 :         CHECK_FOR_INTERRUPTS();
    1268             :     }
    1269          50 : }
    1270             : 
    1271             : /*
    1272             :  * Wait until the parallel apply worker's transaction finishes.
    1273             :  */
    1274             : static void
    1275          50 : pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
    1276             : {
    1277             :     /*
    1278             :      * Wait until the parallel apply worker set the state to
    1279             :      * PARALLEL_TRANS_STARTED which means it has acquired the transaction
    1280             :      * lock. This is to prevent leader apply worker from acquiring the
    1281             :      * transaction lock earlier than the parallel apply worker.
    1282             :      */
    1283          50 :     pa_wait_for_xact_state(winfo, PARALLEL_TRANS_STARTED);
    1284             : 
    1285             :     /*
    1286             :      * Wait for the transaction lock to be released. This is required to
    1287             :      * detect deadlock among leader and parallel apply workers. Refer to the
    1288             :      * comments atop this file.
    1289             :      */
    1290          50 :     pa_lock_transaction(winfo->shared->xid, AccessShareLock);
    1291          48 :     pa_unlock_transaction(winfo->shared->xid, AccessShareLock);
    1292             : 
    1293             :     /*
    1294             :      * Check if the state becomes PARALLEL_TRANS_FINISHED in case the parallel
    1295             :      * apply worker failed while applying changes causing the lock to be
    1296             :      * released.
    1297             :      */
    1298          48 :     if (pa_get_xact_state(winfo->shared) != PARALLEL_TRANS_FINISHED)
    1299           0 :         ereport(ERROR,
    1300             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1301             :                  errmsg("lost connection to the logical replication parallel apply worker")));
    1302          48 : }
    1303             : 
    1304             : /*
    1305             :  * Set the transaction state for a given parallel apply worker.
    1306             :  */
    1307             : void
    1308         102 : pa_set_xact_state(ParallelApplyWorkerShared *wshared,
    1309             :                   ParallelTransState xact_state)
    1310             : {
    1311         102 :     SpinLockAcquire(&wshared->mutex);
    1312         102 :     wshared->xact_state = xact_state;
    1313         102 :     SpinLockRelease(&wshared->mutex);
    1314         102 : }
    1315             : 
    1316             : /*
    1317             :  * Get the transaction state for a given parallel apply worker.
    1318             :  */
    1319             : static ParallelTransState
    1320         560 : pa_get_xact_state(ParallelApplyWorkerShared *wshared)
    1321             : {
    1322             :     ParallelTransState xact_state;
    1323             : 
    1324         560 :     SpinLockAcquire(&wshared->mutex);
    1325         560 :     xact_state = wshared->xact_state;
    1326         560 :     SpinLockRelease(&wshared->mutex);
    1327             : 
    1328         560 :     return xact_state;
    1329             : }
    1330             : 
    1331             : /*
    1332             :  * Cache the parallel apply worker information.
    1333             :  */
    1334             : void
    1335        1044 : pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
    1336             : {
    1337        1044 :     stream_apply_worker = winfo;
    1338        1044 : }
    1339             : 
    1340             : /*
    1341             :  * Form a unique savepoint name for the streaming transaction.
    1342             :  *
    1343             :  * Note that different subscriptions for publications on different nodes can
    1344             :  * receive same remote xid, so we need to use subscription id along with it.
    1345             :  *
    1346             :  * Returns the name in the supplied buffer.
    1347             :  */
    1348             : static void
    1349          54 : pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
    1350             : {
    1351          54 :     snprintf(spname, szsp, "pg_sp_%u_%u", suboid, xid);
    1352          54 : }
    1353             : 
    1354             : /*
    1355             :  * Define a savepoint for a subxact in parallel apply worker if needed.
    1356             :  *
    1357             :  * The parallel apply worker can figure out if a new subtransaction was
    1358             :  * started by checking if the new change arrived with a different xid. In that
    1359             :  * case define a named savepoint, so that we are able to rollback to it
    1360             :  * if required.
    1361             :  */
    1362             : void
    1363      136788 : pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
    1364             : {
    1365      136788 :     if (current_xid != top_xid &&
    1366         104 :         !list_member_xid(subxactlist, current_xid))
    1367             :     {
    1368             :         MemoryContext oldctx;
    1369             :         char        spname[NAMEDATALEN];
    1370             : 
    1371          34 :         pa_savepoint_name(MySubscription->oid, current_xid,
    1372             :                           spname, sizeof(spname));
    1373             : 
    1374          34 :         elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);
    1375             : 
    1376             :         /* We must be in transaction block to define the SAVEPOINT. */
    1377          34 :         if (!IsTransactionBlock())
    1378             :         {
    1379          10 :             if (!IsTransactionState())
    1380           0 :                 StartTransactionCommand();
    1381             : 
    1382          10 :             BeginTransactionBlock();
    1383          10 :             CommitTransactionCommand();
    1384             :         }
    1385             : 
    1386          34 :         DefineSavepoint(spname);
    1387             : 
    1388             :         /*
    1389             :          * CommitTransactionCommand is needed to start a subtransaction after
    1390             :          * issuing a SAVEPOINT inside a transaction block (see
    1391             :          * StartSubTransaction()).
    1392             :          */
    1393          34 :         CommitTransactionCommand();
    1394             : 
    1395          34 :         oldctx = MemoryContextSwitchTo(TopTransactionContext);
    1396          34 :         subxactlist = lappend_xid(subxactlist, current_xid);
    1397          34 :         MemoryContextSwitchTo(oldctx);
    1398             :     }
    1399      136788 : }
    1400             : 
    1401             : /* Reset the list that maintains subtransactions. */
    1402             : void
    1403          48 : pa_reset_subtrans(void)
    1404             : {
    1405             :     /*
    1406             :      * We don't need to free this explicitly as the allocated memory will be
    1407             :      * freed at the transaction end.
    1408             :      */
    1409          48 :     subxactlist = NIL;
    1410          48 : }
    1411             : 
    1412             : /*
    1413             :  * Handle STREAM ABORT message when the transaction was applied in a parallel
    1414             :  * apply worker.
    1415             :  */
    1416             : void
    1417          24 : pa_stream_abort(LogicalRepStreamAbortData *abort_data)
    1418             : {
    1419          24 :     TransactionId xid = abort_data->xid;
    1420          24 :     TransactionId subxid = abort_data->subxid;
    1421             : 
    1422             :     /*
    1423             :      * Update origin state so we can restart streaming from correct position
    1424             :      * in case of crash.
    1425             :      */
    1426          24 :     replorigin_session_origin_lsn = abort_data->abort_lsn;
    1427          24 :     replorigin_session_origin_timestamp = abort_data->abort_time;
    1428             : 
    1429             :     /*
    1430             :      * If the two XIDs are the same, it's in fact abort of toplevel xact, so
    1431             :      * just free the subxactlist.
    1432             :      */
    1433          24 :     if (subxid == xid)
    1434             :     {
    1435           4 :         pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
    1436             : 
    1437             :         /*
    1438             :          * Release the lock as we might be processing an empty streaming
    1439             :          * transaction in which case the lock won't be released during
    1440             :          * transaction rollback.
    1441             :          *
    1442             :          * Note that it's ok to release the transaction lock before aborting
    1443             :          * the transaction because even if the parallel apply worker dies due
    1444             :          * to crash or some other reason, such a transaction would still be
    1445             :          * considered aborted.
    1446             :          */
    1447           4 :         pa_unlock_transaction(xid, AccessExclusiveLock);
    1448             : 
    1449           4 :         AbortCurrentTransaction();
    1450             : 
    1451           4 :         if (IsTransactionBlock())
    1452             :         {
    1453           2 :             EndTransactionBlock(false);
    1454           2 :             CommitTransactionCommand();
    1455             :         }
    1456             : 
    1457           4 :         pa_reset_subtrans();
    1458             : 
    1459           4 :         pgstat_report_activity(STATE_IDLE, NULL);
    1460             :     }
    1461             :     else
    1462             :     {
    1463             :         /* OK, so it's a subxact. Rollback to the savepoint. */
    1464             :         int         i;
    1465             :         char        spname[NAMEDATALEN];
    1466             : 
    1467          20 :         pa_savepoint_name(MySubscription->oid, subxid, spname, sizeof(spname));
    1468             : 
    1469          20 :         elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);
    1470             : 
    1471             :         /*
    1472             :          * Search the subxactlist, determine the offset tracked for the
    1473             :          * subxact, and truncate the list.
    1474             :          *
    1475             :          * Note that for an empty sub-transaction we won't find the subxid
    1476             :          * here.
    1477             :          */
    1478          24 :         for (i = list_length(subxactlist) - 1; i >= 0; i--)
    1479             :         {
    1480          22 :             TransactionId xid_tmp = lfirst_xid(list_nth_cell(subxactlist, i));
    1481             : 
    1482          22 :             if (xid_tmp == subxid)
    1483             :             {
    1484          18 :                 RollbackToSavepoint(spname);
    1485          18 :                 CommitTransactionCommand();
    1486          18 :                 subxactlist = list_truncate(subxactlist, i);
    1487          18 :                 break;
    1488             :             }
    1489             :         }
    1490             :     }
    1491          24 : }
    1492             : 
    1493             : /*
    1494             :  * Set the fileset state for a particular parallel apply worker. The fileset
    1495             :  * will be set once the leader worker serialized all changes to the file
    1496             :  * so that it can be used by parallel apply worker.
    1497             :  */
    1498             : void
    1499          32 : pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
    1500             :                      PartialFileSetState fileset_state)
    1501             : {
    1502          32 :     SpinLockAcquire(&wshared->mutex);
    1503          32 :     wshared->fileset_state = fileset_state;
    1504             : 
    1505          32 :     if (fileset_state == FS_SERIALIZE_DONE)
    1506             :     {
    1507             :         Assert(am_leader_apply_worker());
    1508             :         Assert(MyLogicalRepWorker->stream_fileset);
    1509           8 :         wshared->fileset = *MyLogicalRepWorker->stream_fileset;
    1510             :     }
    1511             : 
    1512          32 :     SpinLockRelease(&wshared->mutex);
    1513          32 : }
    1514             : 
    1515             : /*
    1516             :  * Get the fileset state for the current parallel apply worker.
    1517             :  */
    1518             : static PartialFileSetState
    1519         142 : pa_get_fileset_state(void)
    1520             : {
    1521             :     PartialFileSetState fileset_state;
    1522             : 
    1523             :     Assert(am_parallel_apply_worker());
    1524             : 
    1525         142 :     SpinLockAcquire(&MyParallelShared->mutex);
    1526         142 :     fileset_state = MyParallelShared->fileset_state;
    1527         142 :     SpinLockRelease(&MyParallelShared->mutex);
    1528             : 
    1529         142 :     return fileset_state;
    1530             : }
    1531             : 
    1532             : /*
    1533             :  * Helper functions to acquire and release a lock for each stream block.
    1534             :  *
    1535             :  * Set locktag_field4 to PARALLEL_APPLY_LOCK_STREAM to indicate that it's a
    1536             :  * stream lock.
    1537             :  *
    1538             :  * Refer to the comments atop this file to see how the stream lock is used.
    1539             :  */
    1540             : void
    1541         566 : pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
    1542             : {
    1543         566 :     LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
    1544             :                                    PARALLEL_APPLY_LOCK_STREAM, lockmode);
    1545         562 : }
    1546             : 
    1547             : void
    1548         558 : pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
    1549             : {
    1550         558 :     UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
    1551             :                                      PARALLEL_APPLY_LOCK_STREAM, lockmode);
    1552         558 : }
    1553             : 
    1554             : /*
    1555             :  * Helper functions to acquire and release a lock for each local transaction
    1556             :  * apply.
    1557             :  *
    1558             :  * Set locktag_field4 to PARALLEL_APPLY_LOCK_XACT to indicate that it's a
    1559             :  * transaction lock.
    1560             :  *
    1561             :  * Note that all the callers must pass a remote transaction ID instead of a
    1562             :  * local transaction ID as xid. This is because the local transaction ID will
    1563             :  * only be assigned while applying the first change in the parallel apply but
    1564             :  * it's possible that the first change in the parallel apply worker is blocked
    1565             :  * by a concurrently executing transaction in another parallel apply worker. We
    1566             :  * can only communicate the local transaction id to the leader after applying
    1567             :  * the first change so it won't be able to wait after sending the xact finish
    1568             :  * command using this lock.
    1569             :  *
    1570             :  * Refer to the comments atop this file to see how the transaction lock is
    1571             :  * used.
    1572             :  */
    1573             : void
    1574         104 : pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
    1575             : {
    1576         104 :     LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
    1577             :                                    PARALLEL_APPLY_LOCK_XACT, lockmode);
    1578         102 : }
    1579             : 
    1580             : void
    1581          96 : pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
    1582             : {
    1583          96 :     UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
    1584             :                                      PARALLEL_APPLY_LOCK_XACT, lockmode);
    1585          96 : }
    1586             : 
    1587             : /*
    1588             :  * Decrement the number of pending streaming blocks and wait on the stream lock
    1589             :  * if there is no pending block available.
    1590             :  */
    1591             : void
    1592         520 : pa_decr_and_wait_stream_block(void)
    1593             : {
    1594             :     Assert(am_parallel_apply_worker());
    1595             : 
    1596             :     /*
    1597             :      * It is only possible to not have any pending stream chunks when we are
    1598             :      * applying spooled messages.
    1599             :      */
    1600         520 :     if (pg_atomic_read_u32(&MyParallelShared->pending_stream_count) == 0)
    1601             :     {
    1602          32 :         if (pa_has_spooled_message_pending())
    1603          32 :             return;
    1604             : 
    1605           0 :         elog(ERROR, "invalid pending streaming chunk 0");
    1606             :     }
    1607             : 
    1608         488 :     if (pg_atomic_sub_fetch_u32(&MyParallelShared->pending_stream_count, 1) == 0)
    1609             :     {
    1610          48 :         pa_lock_stream(MyParallelShared->xid, AccessShareLock);
    1611          44 :         pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
    1612             :     }
    1613             : }
    1614             : 
    1615             : /*
    1616             :  * Finish processing the streaming transaction in the leader apply worker.
    1617             :  */
    1618             : void
    1619          50 : pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
    1620             : {
    1621             :     Assert(am_leader_apply_worker());
    1622             : 
    1623             :     /*
    1624             :      * Unlock the shared object lock so that parallel apply worker can
    1625             :      * continue to receive and apply changes.
    1626             :      */
    1627          50 :     pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
    1628             : 
    1629             :     /*
    1630             :      * Wait for that worker to finish. This is necessary to maintain commit
    1631             :      * order which avoids failures due to transaction dependencies and
    1632             :      * deadlocks.
    1633             :      */
    1634          50 :     pa_wait_for_xact_finish(winfo);
    1635             : 
    1636          48 :     if (!XLogRecPtrIsInvalid(remote_lsn))
    1637          44 :         store_flush_position(remote_lsn, winfo->shared->last_commit_end);
    1638             : 
    1639          48 :     pa_free_worker(winfo);
    1640          48 : }

Generated by: LCOV version 1.16