LCOV - code coverage report
Current view: top level - src/backend/replication/logical - applyparallelworker.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 368 411 89.5 %
Date: 2025-01-18 05:15:39 Functions: 36 36 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * applyparallelworker.c
       3             :  *     Support routines for applying xact by parallel apply worker
       4             :  *
       5             :  * Copyright (c) 2023-2025, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/applyparallelworker.c
       9             :  *
      10             :  * This file contains the code to launch, set up, and teardown a parallel apply
      11             :  * worker which receives the changes from the leader worker and invokes routines
      12             :  * to apply those on the subscriber database. Additionally, this file contains
      13             :  * routines that are intended to support setting up, using, and tearing down a
      14             :  * ParallelApplyWorkerInfo which is required so the leader worker and parallel
      15             :  * apply workers can communicate with each other.
      16             :  *
      17             :  * The parallel apply workers are assigned (if available) as soon as xact's
      18             :  * first stream is received for subscriptions that have set their 'streaming'
      19             :  * option as parallel. The leader apply worker will send changes to this new
      20             :  * worker via shared memory. We keep this worker assigned till the transaction
      21             :  * commit is received and also wait for the worker to finish at commit. This
      22             :  * preserves commit ordering and avoid file I/O in most cases, although we
      23             :  * still need to spill to a file if there is no worker available. See comments
      24             :  * atop logical/worker to know more about streamed xacts whose changes are
      25             :  * spilled to disk. It is important to maintain commit order to avoid failures
      26             :  * due to: (a) transaction dependencies - say if we insert a row in the first
      27             :  * transaction and update it in the second transaction on publisher then
      28             :  * allowing the subscriber to apply both in parallel can lead to failure in the
      29             :  * update; (b) deadlocks - allowing transactions that update the same set of
      30             :  * rows/tables in the opposite order to be applied in parallel can lead to
      31             :  * deadlocks.
      32             :  *
      33             :  * A worker pool is used to avoid restarting workers for each streaming
      34             :  * transaction. We maintain each worker's information (ParallelApplyWorkerInfo)
      35             :  * in the ParallelApplyWorkerPool. After successfully launching a new worker,
      36             :  * its information is added to the ParallelApplyWorkerPool. Once the worker
      37             :  * finishes applying the transaction, it is marked as available for re-use.
      38             :  * Now, before starting a new worker to apply the streaming transaction, we
      39             :  * check the list for any available worker. Note that we retain a maximum of
      40             :  * half the max_parallel_apply_workers_per_subscription workers in the pool and
      41             :  * after that, we simply exit the worker after applying the transaction.
      42             :  *
      43             :  * XXX This worker pool threshold is arbitrary and we can provide a GUC
      44             :  * variable for this in the future if required.
      45             :  *
      46             :  * The leader apply worker will create a separate dynamic shared memory segment
      47             :  * when each parallel apply worker starts. The reason for this design is that
      48             :  * we cannot predict how many workers will be needed. It may be possible to
      49             :  * allocate enough shared memory in one segment based on the maximum number of
      50             :  * parallel apply workers (max_parallel_apply_workers_per_subscription), but
      51             :  * this would waste memory if no process is actually started.
      52             :  *
      53             :  * The dynamic shared memory segment contains: (a) a shm_mq that is used to
      54             :  * send changes in the transaction from leader apply worker to parallel apply
      55             :  * worker; (b) another shm_mq that is used to send errors (and other messages
      56             :  * reported via elog/ereport) from the parallel apply worker to leader apply
      57             :  * worker; (c) necessary information to be shared among parallel apply workers
      58             :  * and the leader apply worker (i.e. members of ParallelApplyWorkerShared).
      59             :  *
      60             :  * Locking Considerations
      61             :  * ----------------------
      62             :  * We have a risk of deadlock due to concurrently applying the transactions in
      63             :  * parallel mode that were independent on the publisher side but became
      64             :  * dependent on the subscriber side due to the different database structures
      65             :  * (like schema of subscription tables, constraints, etc.) on each side. This
      66             :  * can happen even without parallel mode when there are concurrent operations
      67             :  * on the subscriber. In order to detect the deadlocks among leader (LA) and
      68             :  * parallel apply (PA) workers, we used lmgr locks when the PA waits for the
      69             :  * next stream (set of changes) and LA waits for PA to finish the transaction.
      70             :  * An alternative approach could be to not allow parallelism when the schema of
      71             :  * tables is different between the publisher and subscriber but that would be
      72             :  * too restrictive and would require the publisher to send much more
      73             :  * information than it is currently sending.
      74             :  *
      75             :  * Consider a case where the subscribed table does not have a unique key on the
      76             :  * publisher and has a unique key on the subscriber. The deadlock can happen in
      77             :  * the following ways:
      78             :  *
      79             :  * 1) Deadlock between the leader apply worker and a parallel apply worker
      80             :  *
      81             :  * Consider that the parallel apply worker (PA) is executing TX-1 and the
      82             :  * leader apply worker (LA) is executing TX-2 concurrently on the subscriber.
      83             :  * Now, LA is waiting for PA because of the unique key constraint of the
      84             :  * subscribed table while PA is waiting for LA to send the next stream of
      85             :  * changes or transaction finish command message.
      86             :  *
      87             :  * In order for lmgr to detect this, we have LA acquire a session lock on the
      88             :  * remote transaction (by pa_lock_stream()) and have PA wait on the lock before
      89             :  * trying to receive the next stream of changes. Specifically, LA will acquire
      90             :  * the lock in AccessExclusive mode before sending the STREAM_STOP and will
      91             :  * release it if already acquired after sending the STREAM_START, STREAM_ABORT
      92             :  * (for toplevel transaction), STREAM_PREPARE, and STREAM_COMMIT. The PA will
      93             :  * acquire the lock in AccessShare mode after processing STREAM_STOP and
      94             :  * STREAM_ABORT (for subtransaction) and then release the lock immediately
      95             :  * after acquiring it.
      96             :  *
      97             :  * The lock graph for the above example will look as follows:
      98             :  * LA (waiting to acquire the lock on the unique index) -> PA (waiting to
      99             :  * acquire the stream lock) -> LA
     100             :  *
     101             :  * This way, when PA is waiting for LA for the next stream of changes, we can
     102             :  * have a wait-edge from PA to LA in lmgr, which will make us detect the
     103             :  * deadlock between LA and PA.
     104             :  *
     105             :  * 2) Deadlock between the leader apply worker and parallel apply workers
     106             :  *
     107             :  * This scenario is similar to the first case but TX-1 and TX-2 are executed by
     108             :  * two parallel apply workers (PA-1 and PA-2 respectively). In this scenario,
     109             :  * PA-2 is waiting for PA-1 to complete its transaction while PA-1 is waiting
     110             :  * for subsequent input from LA. Also, LA is waiting for PA-2 to complete its
     111             :  * transaction in order to preserve the commit order. There is a deadlock among
     112             :  * the three processes.
     113             :  *
     114             :  * In order for lmgr to detect this, we have PA acquire a session lock (this is
     115             :  * a different lock than referred in the previous case, see
     116             :  * pa_lock_transaction()) on the transaction being applied and have LA wait on
     117             :  * the lock before proceeding in the transaction finish commands. Specifically,
     118             :  * PA will acquire this lock in AccessExclusive mode before executing the first
     119             :  * message of the transaction and release it at the xact end. LA will acquire
     120             :  * this lock in AccessShare mode at transaction finish commands and release it
     121             :  * immediately.
     122             :  *
     123             :  * The lock graph for the above example will look as follows:
     124             :  * LA (waiting to acquire the transaction lock) -> PA-2 (waiting to acquire the
     125             :  * lock due to unique index constraint) -> PA-1 (waiting to acquire the stream
     126             :  * lock) -> LA
     127             :  *
     128             :  * This way when LA is waiting to finish the transaction end command to preserve
     129             :  * the commit order, we will be able to detect deadlock, if any.
     130             :  *
     131             :  * One might think we can use XactLockTableWait(), but XactLockTableWait()
     132             :  * considers PREPARED TRANSACTION as still in progress which means the lock
     133             :  * won't be released even after the parallel apply worker has prepared the
     134             :  * transaction.
     135             :  *
     136             :  * 3) Deadlock when the shm_mq buffer is full
     137             :  *
     138             :  * In the previous scenario (ie. PA-1 and PA-2 are executing transactions
     139             :  * concurrently), if the shm_mq buffer between LA and PA-2 is full, LA has to
     140             :  * wait to send messages, and this wait doesn't appear in lmgr.
     141             :  *
     142             :  * To avoid this wait, we use a non-blocking write and wait with a timeout. If
     143             :  * the timeout is exceeded, the LA will serialize all the pending messages to
     144             :  * a file and indicate PA-2 that it needs to read that file for the remaining
     145             :  * messages. Then LA will start waiting for commit as in the previous case
     146             :  * which will detect deadlock if any. See pa_send_data() and
     147             :  * enum TransApplyAction.
     148             :  *
     149             :  * Lock types
     150             :  * ----------
     151             :  * Both the stream lock and the transaction lock mentioned above are
     152             :  * session-level locks because both locks could be acquired outside the
     153             :  * transaction, and the stream lock in the leader needs to persist across
     154             :  * transaction boundaries i.e. until the end of the streaming transaction.
     155             :  *-------------------------------------------------------------------------
     156             :  */
     157             : 
     158             : #include "postgres.h"
     159             : 
     160             : #include "libpq/pqformat.h"
     161             : #include "libpq/pqmq.h"
     162             : #include "pgstat.h"
     163             : #include "postmaster/interrupt.h"
     164             : #include "replication/logicallauncher.h"
     165             : #include "replication/logicalworker.h"
     166             : #include "replication/origin.h"
     167             : #include "replication/worker_internal.h"
     168             : #include "storage/ipc.h"
     169             : #include "storage/lmgr.h"
     170             : #include "tcop/tcopprot.h"
     171             : #include "utils/inval.h"
     172             : #include "utils/memutils.h"
     173             : #include "utils/syscache.h"
     174             : 
     175             : #define PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067
     176             : 
     177             : /*
     178             :  * DSM keys for parallel apply worker. Unlike other parallel execution code,
     179             :  * since we don't need to worry about DSM keys conflicting with plan_node_id we
     180             :  * can use small integers.
     181             :  */
     182             : #define PARALLEL_APPLY_KEY_SHARED       1
     183             : #define PARALLEL_APPLY_KEY_MQ           2
     184             : #define PARALLEL_APPLY_KEY_ERROR_QUEUE  3
     185             : 
     186             : /* Queue size of DSM, 16 MB for now. */
     187             : #define DSM_QUEUE_SIZE  (16 * 1024 * 1024)
     188             : 
     189             : /*
     190             :  * Error queue size of DSM. It is desirable to make it large enough that a
     191             :  * typical ErrorResponse can be sent without blocking. That way, a worker that
     192             :  * errors out can write the whole message into the queue and terminate without
     193             :  * waiting for the user backend.
     194             :  */
     195             : #define DSM_ERROR_QUEUE_SIZE            (16 * 1024)
     196             : 
     197             : /*
     198             :  * There are three fields in each message received by the parallel apply
     199             :  * worker: start_lsn, end_lsn and send_time. Because we have updated these
     200             :  * statistics in the leader apply worker, we can ignore these fields in the
     201             :  * parallel apply worker (see function LogicalRepApplyLoop).
     202             :  */
     203             : #define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
     204             : 
     205             : /*
     206             :  * The type of session-level lock on a transaction being applied on a logical
     207             :  * replication subscriber.
     208             :  */
     209             : #define PARALLEL_APPLY_LOCK_STREAM  0
     210             : #define PARALLEL_APPLY_LOCK_XACT    1
     211             : 
     212             : /*
     213             :  * Hash table entry to map xid to the parallel apply worker state.
     214             :  */
     215             : typedef struct ParallelApplyWorkerEntry
     216             : {
     217             :     TransactionId xid;          /* Hash key -- must be first */
     218             :     ParallelApplyWorkerInfo *winfo;
     219             : } ParallelApplyWorkerEntry;
     220             : 
     221             : /*
     222             :  * A hash table used to cache the state of streaming transactions being applied
     223             :  * by the parallel apply workers.
     224             :  */
     225             : static HTAB *ParallelApplyTxnHash = NULL;
     226             : 
     227             : /*
     228             : * A list (pool) of active parallel apply workers. The information for
     229             : * the new worker is added to the list after successfully launching it. The
     230             : * list entry is removed if there are already enough workers in the worker
     231             : * pool at the end of the transaction. For more information about the worker
     232             : * pool, see comments atop this file.
     233             :  */
     234             : static List *ParallelApplyWorkerPool = NIL;
     235             : 
     236             : /*
     237             :  * Information shared between leader apply worker and parallel apply worker.
     238             :  */
     239             : ParallelApplyWorkerShared *MyParallelShared = NULL;
     240             : 
     241             : /*
     242             :  * Is there a message sent by a parallel apply worker that the leader apply
     243             :  * worker needs to receive?
     244             :  */
     245             : volatile sig_atomic_t ParallelApplyMessagePending = false;
     246             : 
     247             : /*
     248             :  * Cache the parallel apply worker information required for applying the
     249             :  * current streaming transaction. It is used to save the cost of searching the
     250             :  * hash table when applying the changes between STREAM_START and STREAM_STOP.
     251             :  */
     252             : static ParallelApplyWorkerInfo *stream_apply_worker = NULL;
     253             : 
     254             : /* A list to maintain subtransactions, if any. */
     255             : static List *subxactlist = NIL;
     256             : 
     257             : static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo);
     258             : static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared);
     259             : static PartialFileSetState pa_get_fileset_state(void);
     260             : 
     261             : /*
     262             :  * Returns true if it is OK to start a parallel apply worker, false otherwise.
     263             :  */
     264             : static bool
     265         164 : pa_can_start(void)
     266             : {
     267             :     /* Only leader apply workers can start parallel apply workers. */
     268         164 :     if (!am_leader_apply_worker())
     269          54 :         return false;
     270             : 
     271             :     /*
     272             :      * It is good to check for any change in the subscription parameter to
     273             :      * avoid the case where for a very long time the change doesn't get
     274             :      * reflected. This can happen when there is a constant flow of streaming
     275             :      * transactions that are handled by parallel apply workers.
     276             :      *
     277             :      * It is better to do it before the below checks so that the latest values
     278             :      * of subscription can be used for the checks.
     279             :      */
     280         110 :     maybe_reread_subscription();
     281             : 
     282             :     /*
     283             :      * Don't start a new parallel apply worker if the subscription is not
     284             :      * using parallel streaming mode, or if the publisher does not support
     285             :      * parallel apply.
     286             :      */
     287         110 :     if (!MyLogicalRepWorker->parallel_apply)
     288          56 :         return false;
     289             : 
     290             :     /*
     291             :      * Don't start a new parallel worker if user has set skiplsn as it's
     292             :      * possible that they want to skip the streaming transaction. For
     293             :      * streaming transactions, we need to serialize the transaction to a file
     294             :      * so that we can get the last LSN of the transaction to judge whether to
     295             :      * skip before starting to apply the change.
     296             :      *
     297             :      * One might think that we could allow parallelism if the first lsn of the
     298             :      * transaction is greater than skiplsn, but we don't send it with the
     299             :      * STREAM START message, and it doesn't seem worth sending the extra eight
     300             :      * bytes with the STREAM START to enable parallelism for this case.
     301             :      */
     302          54 :     if (!XLogRecPtrIsInvalid(MySubscription->skiplsn))
     303           0 :         return false;
     304             : 
     305             :     /*
     306             :      * For streaming transactions that are being applied using a parallel
     307             :      * apply worker, we cannot decide whether to apply the change for a
     308             :      * relation that is not in the READY state (see
     309             :      * should_apply_changes_for_rel) as we won't know remote_final_lsn by that
     310             :      * time. So, we don't start the new parallel apply worker in this case.
     311             :      */
     312          54 :     if (!AllTablesyncsReady())
     313           0 :         return false;
     314             : 
     315          54 :     return true;
     316             : }
     317             : 
     318             : /*
     319             :  * Set up a dynamic shared memory segment.
     320             :  *
     321             :  * We set up a control region that contains a fixed-size worker info
     322             :  * (ParallelApplyWorkerShared), a message queue, and an error queue.
     323             :  *
     324             :  * Returns true on success, false on failure.
     325             :  */
     326             : static bool
     327          20 : pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
     328             : {
     329             :     shm_toc_estimator e;
     330             :     Size        segsize;
     331             :     dsm_segment *seg;
     332             :     shm_toc    *toc;
     333             :     ParallelApplyWorkerShared *shared;
     334             :     shm_mq     *mq;
     335          20 :     Size        queue_size = DSM_QUEUE_SIZE;
     336          20 :     Size        error_queue_size = DSM_ERROR_QUEUE_SIZE;
     337             : 
     338             :     /*
     339             :      * Estimate how much shared memory we need.
     340             :      *
     341             :      * Because the TOC machinery may choose to insert padding of oddly-sized
     342             :      * requests, we must estimate each chunk separately.
     343             :      *
     344             :      * We need one key to register the location of the header, and two other
     345             :      * keys to track the locations of the message queue and the error message
     346             :      * queue.
     347             :      */
     348          20 :     shm_toc_initialize_estimator(&e);
     349          20 :     shm_toc_estimate_chunk(&e, sizeof(ParallelApplyWorkerShared));
     350          20 :     shm_toc_estimate_chunk(&e, queue_size);
     351          20 :     shm_toc_estimate_chunk(&e, error_queue_size);
     352             : 
     353          20 :     shm_toc_estimate_keys(&e, 3);
     354          20 :     segsize = shm_toc_estimate(&e);
     355             : 
     356             :     /* Create the shared memory segment and establish a table of contents. */
     357          20 :     seg = dsm_create(shm_toc_estimate(&e), 0);
     358          20 :     if (!seg)
     359           0 :         return false;
     360             : 
     361          20 :     toc = shm_toc_create(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg),
     362             :                          segsize);
     363             : 
     364             :     /* Set up the header region. */
     365          20 :     shared = shm_toc_allocate(toc, sizeof(ParallelApplyWorkerShared));
     366          20 :     SpinLockInit(&shared->mutex);
     367             : 
     368          20 :     shared->xact_state = PARALLEL_TRANS_UNKNOWN;
     369          20 :     pg_atomic_init_u32(&(shared->pending_stream_count), 0);
     370          20 :     shared->last_commit_end = InvalidXLogRecPtr;
     371          20 :     shared->fileset_state = FS_EMPTY;
     372             : 
     373          20 :     shm_toc_insert(toc, PARALLEL_APPLY_KEY_SHARED, shared);
     374             : 
     375             :     /* Set up message queue for the worker. */
     376          20 :     mq = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
     377          20 :     shm_toc_insert(toc, PARALLEL_APPLY_KEY_MQ, mq);
     378          20 :     shm_mq_set_sender(mq, MyProc);
     379             : 
     380             :     /* Attach the queue. */
     381          20 :     winfo->mq_handle = shm_mq_attach(mq, seg, NULL);
     382             : 
     383             :     /* Set up error queue for the worker. */
     384          20 :     mq = shm_mq_create(shm_toc_allocate(toc, error_queue_size),
     385             :                        error_queue_size);
     386          20 :     shm_toc_insert(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, mq);
     387          20 :     shm_mq_set_receiver(mq, MyProc);
     388             : 
     389             :     /* Attach the queue. */
     390          20 :     winfo->error_mq_handle = shm_mq_attach(mq, seg, NULL);
     391             : 
     392             :     /* Return results to caller. */
     393          20 :     winfo->dsm_seg = seg;
     394          20 :     winfo->shared = shared;
     395             : 
     396          20 :     return true;
     397             : }
     398             : 
     399             : /*
     400             :  * Try to get a parallel apply worker from the pool. If none is available then
     401             :  * start a new one.
     402             :  */
     403             : static ParallelApplyWorkerInfo *
     404          54 : pa_launch_parallel_worker(void)
     405             : {
     406             :     MemoryContext oldcontext;
     407             :     bool        launched;
     408             :     ParallelApplyWorkerInfo *winfo;
     409             :     ListCell   *lc;
     410             : 
     411             :     /* Try to get an available parallel apply worker from the worker pool. */
     412          58 :     foreach(lc, ParallelApplyWorkerPool)
     413             :     {
     414          38 :         winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
     415             : 
     416          38 :         if (!winfo->in_use)
     417          34 :             return winfo;
     418             :     }
     419             : 
     420             :     /*
     421             :      * Start a new parallel apply worker.
     422             :      *
     423             :      * The worker info can be used for the lifetime of the worker process, so
     424             :      * create it in a permanent context.
     425             :      */
     426          20 :     oldcontext = MemoryContextSwitchTo(ApplyContext);
     427             : 
     428          20 :     winfo = (ParallelApplyWorkerInfo *) palloc0(sizeof(ParallelApplyWorkerInfo));
     429             : 
     430             :     /* Setup shared memory. */
     431          20 :     if (!pa_setup_dsm(winfo))
     432             :     {
     433           0 :         MemoryContextSwitchTo(oldcontext);
     434           0 :         pfree(winfo);
     435           0 :         return NULL;
     436             :     }
     437             : 
     438          20 :     launched = logicalrep_worker_launch(WORKERTYPE_PARALLEL_APPLY,
     439          20 :                                         MyLogicalRepWorker->dbid,
     440          20 :                                         MySubscription->oid,
     441          20 :                                         MySubscription->name,
     442          20 :                                         MyLogicalRepWorker->userid,
     443             :                                         InvalidOid,
     444             :                                         dsm_segment_handle(winfo->dsm_seg));
     445             : 
     446          20 :     if (launched)
     447             :     {
     448          20 :         ParallelApplyWorkerPool = lappend(ParallelApplyWorkerPool, winfo);
     449             :     }
     450             :     else
     451             :     {
     452           0 :         pa_free_worker_info(winfo);
     453           0 :         winfo = NULL;
     454             :     }
     455             : 
     456          20 :     MemoryContextSwitchTo(oldcontext);
     457             : 
     458          20 :     return winfo;
     459             : }
     460             : 
     461             : /*
     462             :  * Allocate a parallel apply worker that will be used for the specified xid.
     463             :  *
     464             :  * We first try to get an available worker from the pool, if any and then try
     465             :  * to launch a new worker. On successful allocation, remember the worker
     466             :  * information in the hash table so that we can get it later for processing the
     467             :  * streaming changes.
     468             :  */
     469             : void
     470         164 : pa_allocate_worker(TransactionId xid)
     471             : {
     472             :     bool        found;
     473         164 :     ParallelApplyWorkerInfo *winfo = NULL;
     474             :     ParallelApplyWorkerEntry *entry;
     475             : 
     476         164 :     if (!pa_can_start())
     477         110 :         return;
     478             : 
     479          54 :     winfo = pa_launch_parallel_worker();
     480          54 :     if (!winfo)
     481           0 :         return;
     482             : 
     483             :     /* First time through, initialize parallel apply worker state hashtable. */
     484          54 :     if (!ParallelApplyTxnHash)
     485             :     {
     486             :         HASHCTL     ctl;
     487             : 
     488         182 :         MemSet(&ctl, 0, sizeof(ctl));
     489          14 :         ctl.keysize = sizeof(TransactionId);
     490          14 :         ctl.entrysize = sizeof(ParallelApplyWorkerEntry);
     491          14 :         ctl.hcxt = ApplyContext;
     492             : 
     493          14 :         ParallelApplyTxnHash = hash_create("logical replication parallel apply workers hash",
     494             :                                            16, &ctl,
     495             :                                            HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
     496             :     }
     497             : 
     498             :     /* Create an entry for the requested transaction. */
     499          54 :     entry = hash_search(ParallelApplyTxnHash, &xid, HASH_ENTER, &found);
     500          54 :     if (found)
     501           0 :         elog(ERROR, "hash table corrupted");
     502             : 
     503             :     /* Update the transaction information in shared memory. */
     504          54 :     SpinLockAcquire(&winfo->shared->mutex);
     505          54 :     winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN;
     506          54 :     winfo->shared->xid = xid;
     507          54 :     SpinLockRelease(&winfo->shared->mutex);
     508             : 
     509          54 :     winfo->in_use = true;
     510          54 :     winfo->serialize_changes = false;
     511          54 :     entry->winfo = winfo;
     512             : }
     513             : 
     514             : /*
     515             :  * Find the assigned worker for the given transaction, if any.
     516             :  */
     517             : ParallelApplyWorkerInfo *
     518      514478 : pa_find_worker(TransactionId xid)
     519             : {
     520             :     bool        found;
     521             :     ParallelApplyWorkerEntry *entry;
     522             : 
     523      514478 :     if (!TransactionIdIsValid(xid))
     524      160142 :         return NULL;
     525             : 
     526      354336 :     if (!ParallelApplyTxnHash)
     527      206474 :         return NULL;
     528             : 
     529             :     /* Return the cached parallel apply worker if valid. */
     530      147862 :     if (stream_apply_worker)
     531      147288 :         return stream_apply_worker;
     532             : 
     533             :     /* Find an entry for the requested transaction. */
     534         574 :     entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
     535         574 :     if (found)
     536             :     {
     537             :         /* The worker must not have exited.  */
     538             :         Assert(entry->winfo->in_use);
     539         574 :         return entry->winfo;
     540             :     }
     541             : 
     542           0 :     return NULL;
     543             : }
     544             : 
     545             : /*
     546             :  * Makes the worker available for reuse.
     547             :  *
     548             :  * This removes the parallel apply worker entry from the hash table so that it
     549             :  * can't be used. If there are enough workers in the pool, it stops the worker
     550             :  * and frees the corresponding info. Otherwise it just marks the worker as
     551             :  * available for reuse.
     552             :  *
     553             :  * For more information about the worker pool, see comments atop this file.
     554             :  */
     555             : static void
     556          48 : pa_free_worker(ParallelApplyWorkerInfo *winfo)
     557             : {
     558             :     Assert(!am_parallel_apply_worker());
     559             :     Assert(winfo->in_use);
     560             :     Assert(pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED);
     561             : 
     562          48 :     if (!hash_search(ParallelApplyTxnHash, &winfo->shared->xid, HASH_REMOVE, NULL))
     563           0 :         elog(ERROR, "hash table corrupted");
     564             : 
     565             :     /*
     566             :      * Stop the worker if there are enough workers in the pool.
     567             :      *
     568             :      * XXX Additionally, we also stop the worker if the leader apply worker
     569             :      * serialize part of the transaction data due to a send timeout. This is
     570             :      * because the message could be partially written to the queue and there
     571             :      * is no way to clean the queue other than resending the message until it
     572             :      * succeeds. Instead of trying to send the data which anyway would have
     573             :      * been serialized and then letting the parallel apply worker deal with
     574             :      * the spurious message, we stop the worker.
     575             :      */
     576          48 :     if (winfo->serialize_changes ||
     577          40 :         list_length(ParallelApplyWorkerPool) >
     578          40 :         (max_parallel_apply_workers_per_subscription / 2))
     579             :     {
     580          10 :         logicalrep_pa_worker_stop(winfo);
     581          10 :         pa_free_worker_info(winfo);
     582             : 
     583          10 :         return;
     584             :     }
     585             : 
     586          38 :     winfo->in_use = false;
     587          38 :     winfo->serialize_changes = false;
     588             : }
     589             : 
     590             : /*
     591             :  * Free the parallel apply worker information and unlink the files with
     592             :  * serialized changes if any.
     593             :  */
     594             : static void
     595          10 : pa_free_worker_info(ParallelApplyWorkerInfo *winfo)
     596             : {
     597             :     Assert(winfo);
     598             : 
     599          10 :     if (winfo->mq_handle)
     600          10 :         shm_mq_detach(winfo->mq_handle);
     601             : 
     602          10 :     if (winfo->error_mq_handle)
     603           0 :         shm_mq_detach(winfo->error_mq_handle);
     604             : 
     605             :     /* Unlink the files with serialized changes. */
     606          10 :     if (winfo->serialize_changes)
     607           8 :         stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid);
     608             : 
     609          10 :     if (winfo->dsm_seg)
     610          10 :         dsm_detach(winfo->dsm_seg);
     611             : 
     612             :     /* Remove from the worker pool. */
     613          10 :     ParallelApplyWorkerPool = list_delete_ptr(ParallelApplyWorkerPool, winfo);
     614             : 
     615          10 :     pfree(winfo);
     616          10 : }
     617             : 
     618             : /*
     619             :  * Detach the error queue for all parallel apply workers.
     620             :  */
     621             : void
     622         410 : pa_detach_all_error_mq(void)
     623             : {
     624             :     ListCell   *lc;
     625             : 
     626         420 :     foreach(lc, ParallelApplyWorkerPool)
     627             :     {
     628          10 :         ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
     629             : 
     630          10 :         if (winfo->error_mq_handle)
     631             :         {
     632          10 :             shm_mq_detach(winfo->error_mq_handle);
     633          10 :             winfo->error_mq_handle = NULL;
     634             :         }
     635             :     }
     636         410 : }
     637             : 
     638             : /*
     639             :  * Check if there are any pending spooled messages.
     640             :  */
     641             : static bool
     642          32 : pa_has_spooled_message_pending()
     643             : {
     644             :     PartialFileSetState fileset_state;
     645             : 
     646          32 :     fileset_state = pa_get_fileset_state();
     647             : 
     648          32 :     return (fileset_state != FS_EMPTY);
     649             : }
     650             : 
     651             : /*
     652             :  * Replay the spooled messages once the leader apply worker has finished
     653             :  * serializing changes to the file.
     654             :  *
     655             :  * Returns false if there aren't any pending spooled messages, true otherwise.
     656             :  */
     657             : static bool
     658         116 : pa_process_spooled_messages_if_required(void)
     659             : {
     660             :     PartialFileSetState fileset_state;
     661             : 
     662         116 :     fileset_state = pa_get_fileset_state();
     663             : 
     664         116 :     if (fileset_state == FS_EMPTY)
     665         100 :         return false;
     666             : 
     667             :     /*
     668             :      * If the leader apply worker is busy serializing the partial changes then
     669             :      * acquire the stream lock now and wait for the leader worker to finish
     670             :      * serializing the changes. Otherwise, the parallel apply worker won't get
     671             :      * a chance to receive a STREAM_STOP (and acquire the stream lock) until
     672             :      * the leader had serialized all changes which can lead to undetected
     673             :      * deadlock.
     674             :      *
     675             :      * Note that the fileset state can be FS_SERIALIZE_DONE once the leader
     676             :      * worker has finished serializing the changes.
     677             :      */
     678          16 :     if (fileset_state == FS_SERIALIZE_IN_PROGRESS)
     679             :     {
     680           0 :         pa_lock_stream(MyParallelShared->xid, AccessShareLock);
     681           0 :         pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
     682             : 
     683           0 :         fileset_state = pa_get_fileset_state();
     684             :     }
     685             : 
     686             :     /*
     687             :      * We cannot read the file immediately after the leader has serialized all
     688             :      * changes to the file because there may still be messages in the memory
     689             :      * queue. We will apply all spooled messages the next time we call this
     690             :      * function and that will ensure there are no messages left in the memory
     691             :      * queue.
     692             :      */
     693          16 :     if (fileset_state == FS_SERIALIZE_DONE)
     694             :     {
     695           8 :         pa_set_fileset_state(MyParallelShared, FS_READY);
     696             :     }
     697           8 :     else if (fileset_state == FS_READY)
     698             :     {
     699           8 :         apply_spooled_messages(&MyParallelShared->fileset,
     700           8 :                                MyParallelShared->xid,
     701             :                                InvalidXLogRecPtr);
     702           8 :         pa_set_fileset_state(MyParallelShared, FS_EMPTY);
     703             :     }
     704             : 
     705          16 :     return true;
     706             : }
     707             : 
     708             : /*
     709             :  * Interrupt handler for main loop of parallel apply worker.
     710             :  */
     711             : static void
     712      128782 : ProcessParallelApplyInterrupts(void)
     713             : {
     714      128782 :     CHECK_FOR_INTERRUPTS();
     715             : 
     716      128776 :     if (ShutdownRequestPending)
     717             :     {
     718          10 :         ereport(LOG,
     719             :                 (errmsg("logical replication parallel apply worker for subscription \"%s\" has finished",
     720             :                         MySubscription->name)));
     721             : 
     722          10 :         proc_exit(0);
     723             :     }
     724             : 
     725      128766 :     if (ConfigReloadPending)
     726             :     {
     727           8 :         ConfigReloadPending = false;
     728           8 :         ProcessConfigFile(PGC_SIGHUP);
     729             :     }
     730      128766 : }
     731             : 
     732             : /* Parallel apply worker main loop. */
     733             : static void
     734          20 : LogicalParallelApplyLoop(shm_mq_handle *mqh)
     735             : {
     736             :     shm_mq_result shmq_res;
     737             :     ErrorContextCallback errcallback;
     738          20 :     MemoryContext oldcxt = CurrentMemoryContext;
     739             : 
     740             :     /*
     741             :      * Init the ApplyMessageContext which we clean up after each replication
     742             :      * protocol message.
     743             :      */
     744          20 :     ApplyMessageContext = AllocSetContextCreate(ApplyContext,
     745             :                                                 "ApplyMessageContext",
     746             :                                                 ALLOCSET_DEFAULT_SIZES);
     747             : 
     748             :     /*
     749             :      * Push apply error context callback. Fields will be filled while applying
     750             :      * a change.
     751             :      */
     752          20 :     errcallback.callback = apply_error_callback;
     753          20 :     errcallback.previous = error_context_stack;
     754          20 :     error_context_stack = &errcallback;
     755             : 
     756             :     for (;;)
     757      128762 :     {
     758             :         void       *data;
     759             :         Size        len;
     760             : 
     761      128782 :         ProcessParallelApplyInterrupts();
     762             : 
     763             :         /* Ensure we are reading the data into our memory context. */
     764      128766 :         MemoryContextSwitchTo(ApplyMessageContext);
     765             : 
     766      128766 :         shmq_res = shm_mq_receive(mqh, &len, &data, true);
     767             : 
     768      128766 :         if (shmq_res == SHM_MQ_SUCCESS)
     769             :         {
     770             :             StringInfoData s;
     771             :             int         c;
     772             : 
     773      128650 :             if (len == 0)
     774           0 :                 elog(ERROR, "invalid message length");
     775             : 
     776      128650 :             initReadOnlyStringInfo(&s, data, len);
     777             : 
     778             :             /*
     779             :              * The first byte of messages sent from leader apply worker to
     780             :              * parallel apply workers can only be 'w'.
     781             :              */
     782      128650 :             c = pq_getmsgbyte(&s);
     783      128650 :             if (c != 'w')
     784           0 :                 elog(ERROR, "unexpected message \"%c\"", c);
     785             : 
     786             :             /*
     787             :              * Ignore statistics fields that have been updated by the leader
     788             :              * apply worker.
     789             :              *
     790             :              * XXX We can avoid sending the statistics fields from the leader
     791             :              * apply worker but for that, it needs to rebuild the entire
     792             :              * message by removing these fields which could be more work than
     793             :              * simply ignoring these fields in the parallel apply worker.
     794             :              */
     795      128650 :             s.cursor += SIZE_STATS_MESSAGE;
     796             : 
     797      128650 :             apply_dispatch(&s);
     798             :         }
     799         116 :         else if (shmq_res == SHM_MQ_WOULD_BLOCK)
     800             :         {
     801             :             /* Replay the changes from the file, if any. */
     802         116 :             if (!pa_process_spooled_messages_if_required())
     803             :             {
     804             :                 int         rc;
     805             : 
     806             :                 /* Wait for more work. */
     807         100 :                 rc = WaitLatch(MyLatch,
     808             :                                WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     809             :                                1000L,
     810             :                                WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN);
     811             : 
     812         100 :                 if (rc & WL_LATCH_SET)
     813          96 :                     ResetLatch(MyLatch);
     814             :             }
     815             :         }
     816             :         else
     817             :         {
     818             :             Assert(shmq_res == SHM_MQ_DETACHED);
     819             : 
     820           0 :             ereport(ERROR,
     821             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     822             :                      errmsg("lost connection to the logical replication apply worker")));
     823             :         }
     824             : 
     825      128762 :         MemoryContextReset(ApplyMessageContext);
     826      128762 :         MemoryContextSwitchTo(oldcxt);
     827             :     }
     828             : 
     829             :     /* Pop the error context stack. */
     830             :     error_context_stack = errcallback.previous;
     831             : 
     832             :     MemoryContextSwitchTo(oldcxt);
     833             : }
     834             : 
     835             : /*
     836             :  * Make sure the leader apply worker tries to read from our error queue one more
     837             :  * time. This guards against the case where we exit uncleanly without sending
     838             :  * an ErrorResponse, for example because some code calls proc_exit directly.
     839             :  *
     840             :  * Also explicitly detach from dsm segment to invoke on_dsm_detach callbacks,
     841             :  * if any. See ParallelWorkerShutdown for details.
     842             :  */
     843             : static void
     844          20 : pa_shutdown(int code, Datum arg)
     845             : {
     846          20 :     SendProcSignal(MyLogicalRepWorker->leader_pid,
     847             :                    PROCSIG_PARALLEL_APPLY_MESSAGE,
     848             :                    INVALID_PROC_NUMBER);
     849             : 
     850          20 :     dsm_detach((dsm_segment *) DatumGetPointer(arg));
     851          20 : }
     852             : 
     853             : /*
     854             :  * Parallel apply worker entry point.
     855             :  */
     856             : void
     857          20 : ParallelApplyWorkerMain(Datum main_arg)
     858             : {
     859             :     ParallelApplyWorkerShared *shared;
     860             :     dsm_handle  handle;
     861             :     dsm_segment *seg;
     862             :     shm_toc    *toc;
     863             :     shm_mq     *mq;
     864             :     shm_mq_handle *mqh;
     865             :     shm_mq_handle *error_mqh;
     866             :     RepOriginId originid;
     867          20 :     int         worker_slot = DatumGetInt32(main_arg);
     868             :     char        originname[NAMEDATALEN];
     869             : 
     870          20 :     InitializingApplyWorker = true;
     871             : 
     872             :     /* Setup signal handling. */
     873          20 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
     874          20 :     pqsignal(SIGINT, SignalHandlerForShutdownRequest);
     875          20 :     pqsignal(SIGTERM, die);
     876          20 :     BackgroundWorkerUnblockSignals();
     877             : 
     878             :     /*
     879             :      * Attach to the dynamic shared memory segment for the parallel apply, and
     880             :      * find its table of contents.
     881             :      *
     882             :      * Like parallel query, we don't need resource owner by this time. See
     883             :      * ParallelWorkerMain.
     884             :      */
     885          20 :     memcpy(&handle, MyBgworkerEntry->bgw_extra, sizeof(dsm_handle));
     886          20 :     seg = dsm_attach(handle);
     887          20 :     if (!seg)
     888           0 :         ereport(ERROR,
     889             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     890             :                  errmsg("could not map dynamic shared memory segment")));
     891             : 
     892          20 :     toc = shm_toc_attach(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg));
     893          20 :     if (!toc)
     894           0 :         ereport(ERROR,
     895             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     896             :                  errmsg("invalid magic number in dynamic shared memory segment")));
     897             : 
     898             :     /* Look up the shared information. */
     899          20 :     shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false);
     900          20 :     MyParallelShared = shared;
     901             : 
     902             :     /*
     903             :      * Attach to the message queue.
     904             :      */
     905          20 :     mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_MQ, false);
     906          20 :     shm_mq_set_receiver(mq, MyProc);
     907          20 :     mqh = shm_mq_attach(mq, seg, NULL);
     908             : 
     909             :     /*
     910             :      * Primary initialization is complete. Now, we can attach to our slot.
     911             :      * This is to ensure that the leader apply worker does not write data to
     912             :      * the uninitialized memory queue.
     913             :      */
     914          20 :     logicalrep_worker_attach(worker_slot);
     915             : 
     916             :     /*
     917             :      * Register the shutdown callback after we are attached to the worker
     918             :      * slot. This is to ensure that MyLogicalRepWorker remains valid when this
     919             :      * callback is invoked.
     920             :      */
     921          20 :     before_shmem_exit(pa_shutdown, PointerGetDatum(seg));
     922             : 
     923          20 :     SpinLockAcquire(&MyParallelShared->mutex);
     924          20 :     MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation;
     925          20 :     MyParallelShared->logicalrep_worker_slot_no = worker_slot;
     926          20 :     SpinLockRelease(&MyParallelShared->mutex);
     927             : 
     928             :     /*
     929             :      * Attach to the error queue.
     930             :      */
     931          20 :     mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, false);
     932          20 :     shm_mq_set_sender(mq, MyProc);
     933          20 :     error_mqh = shm_mq_attach(mq, seg, NULL);
     934             : 
     935          20 :     pq_redirect_to_shm_mq(seg, error_mqh);
     936          20 :     pq_set_parallel_leader(MyLogicalRepWorker->leader_pid,
     937             :                            INVALID_PROC_NUMBER);
     938             : 
     939          20 :     MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
     940          20 :         MyLogicalRepWorker->reply_time = 0;
     941             : 
     942          20 :     InitializeLogRepWorker();
     943             : 
     944          20 :     InitializingApplyWorker = false;
     945             : 
     946             :     /* Setup replication origin tracking. */
     947          20 :     StartTransactionCommand();
     948          20 :     ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
     949             :                                        originname, sizeof(originname));
     950          20 :     originid = replorigin_by_name(originname, false);
     951             : 
     952             :     /*
     953             :      * The parallel apply worker doesn't need to monopolize this replication
     954             :      * origin which was already acquired by its leader process.
     955             :      */
     956          20 :     replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
     957          20 :     replorigin_session_origin = originid;
     958          20 :     CommitTransactionCommand();
     959             : 
     960             :     /*
     961             :      * Setup callback for syscache so that we know when something changes in
     962             :      * the subscription relation state.
     963             :      */
     964          20 :     CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
     965             :                                   invalidate_syncing_table_states,
     966             :                                   (Datum) 0);
     967             : 
     968          20 :     set_apply_error_context_origin(originname);
     969             : 
     970          20 :     LogicalParallelApplyLoop(mqh);
     971             : 
     972             :     /*
     973             :      * The parallel apply worker must not get here because the parallel apply
     974             :      * worker will only stop when it receives a SIGTERM or SIGINT from the
     975             :      * leader, or when there is an error. None of these cases will allow the
     976             :      * code to reach here.
     977             :      */
     978             :     Assert(false);
     979           0 : }
     980             : 
     981             : /*
     982             :  * Handle receipt of an interrupt indicating a parallel apply worker message.
     983             :  *
     984             :  * Note: this is called within a signal handler! All we can do is set a flag
     985             :  * that will cause the next CHECK_FOR_INTERRUPTS() to invoke
     986             :  * HandleParallelApplyMessages().
     987             :  */
     988             : void
     989          30 : HandleParallelApplyMessageInterrupt(void)
     990             : {
     991          30 :     InterruptPending = true;
     992          30 :     ParallelApplyMessagePending = true;
     993          30 :     SetLatch(MyLatch);
     994          30 : }
     995             : 
     996             : /*
     997             :  * Handle a single protocol message received from a single parallel apply
     998             :  * worker.
     999             :  */
    1000             : static void
    1001           2 : HandleParallelApplyMessage(StringInfo msg)
    1002             : {
    1003             :     char        msgtype;
    1004             : 
    1005           2 :     msgtype = pq_getmsgbyte(msg);
    1006             : 
    1007           2 :     switch (msgtype)
    1008             :     {
    1009           2 :         case 'E':               /* ErrorResponse */
    1010             :             {
    1011             :                 ErrorData   edata;
    1012             : 
    1013             :                 /* Parse ErrorResponse. */
    1014           2 :                 pq_parse_errornotice(msg, &edata);
    1015             : 
    1016             :                 /*
    1017             :                  * If desired, add a context line to show that this is a
    1018             :                  * message propagated from a parallel apply worker. Otherwise,
    1019             :                  * it can sometimes be confusing to understand what actually
    1020             :                  * happened.
    1021             :                  */
    1022           2 :                 if (edata.context)
    1023           2 :                     edata.context = psprintf("%s\n%s", edata.context,
    1024             :                                              _("logical replication parallel apply worker"));
    1025             :                 else
    1026           0 :                     edata.context = pstrdup(_("logical replication parallel apply worker"));
    1027             : 
    1028             :                 /*
    1029             :                  * Context beyond that should use the error context callbacks
    1030             :                  * that were in effect in LogicalRepApplyLoop().
    1031             :                  */
    1032           2 :                 error_context_stack = apply_error_context_stack;
    1033             : 
    1034             :                 /*
    1035             :                  * The actual error must have been reported by the parallel
    1036             :                  * apply worker.
    1037             :                  */
    1038           2 :                 ereport(ERROR,
    1039             :                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1040             :                          errmsg("logical replication parallel apply worker exited due to error"),
    1041             :                          errcontext("%s", edata.context)));
    1042             :             }
    1043             : 
    1044             :             /*
    1045             :              * Don't need to do anything about NoticeResponse and
    1046             :              * NotifyResponse as the logical replication worker doesn't need
    1047             :              * to send messages to the client.
    1048             :              */
    1049           0 :         case 'N':
    1050             :         case 'A':
    1051           0 :             break;
    1052             : 
    1053           0 :         default:
    1054           0 :             elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",
    1055             :                  msgtype, msg->len);
    1056             :     }
    1057           0 : }
    1058             : 
    1059             : /*
    1060             :  * Handle any queued protocol messages received from parallel apply workers.
    1061             :  */
    1062             : void
    1063          12 : HandleParallelApplyMessages(void)
    1064             : {
    1065             :     ListCell   *lc;
    1066             :     MemoryContext oldcontext;
    1067             : 
    1068             :     static MemoryContext hpam_context = NULL;
    1069             : 
    1070             :     /*
    1071             :      * This is invoked from ProcessInterrupts(), and since some of the
    1072             :      * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
    1073             :      * for recursive calls if more signals are received while this runs. It's
    1074             :      * unclear that recursive entry would be safe, and it doesn't seem useful
    1075             :      * even if it is safe, so let's block interrupts until done.
    1076             :      */
    1077          12 :     HOLD_INTERRUPTS();
    1078             : 
    1079             :     /*
    1080             :      * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
    1081             :      * don't want to risk leaking data into long-lived contexts, so let's do
    1082             :      * our work here in a private context that we can reset on each use.
    1083             :      */
    1084          12 :     if (!hpam_context)          /* first time through? */
    1085          10 :         hpam_context = AllocSetContextCreate(TopMemoryContext,
    1086             :                                              "HandleParallelApplyMessages",
    1087             :                                              ALLOCSET_DEFAULT_SIZES);
    1088             :     else
    1089           2 :         MemoryContextReset(hpam_context);
    1090             : 
    1091          12 :     oldcontext = MemoryContextSwitchTo(hpam_context);
    1092             : 
    1093          12 :     ParallelApplyMessagePending = false;
    1094             : 
    1095          24 :     foreach(lc, ParallelApplyWorkerPool)
    1096             :     {
    1097             :         shm_mq_result res;
    1098             :         Size        nbytes;
    1099             :         void       *data;
    1100          14 :         ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
    1101             : 
    1102             :         /*
    1103             :          * The leader will detach from the error queue and set it to NULL
    1104             :          * before preparing to stop all parallel apply workers, so we don't
    1105             :          * need to handle error messages anymore. See
    1106             :          * logicalrep_worker_detach.
    1107             :          */
    1108          14 :         if (!winfo->error_mq_handle)
    1109          12 :             continue;
    1110             : 
    1111           4 :         res = shm_mq_receive(winfo->error_mq_handle, &nbytes, &data, true);
    1112             : 
    1113           4 :         if (res == SHM_MQ_WOULD_BLOCK)
    1114           2 :             continue;
    1115           2 :         else if (res == SHM_MQ_SUCCESS)
    1116             :         {
    1117             :             StringInfoData msg;
    1118             : 
    1119           2 :             initStringInfo(&msg);
    1120           2 :             appendBinaryStringInfo(&msg, data, nbytes);
    1121           2 :             HandleParallelApplyMessage(&msg);
    1122           0 :             pfree(msg.data);
    1123             :         }
    1124             :         else
    1125           0 :             ereport(ERROR,
    1126             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1127             :                      errmsg("lost connection to the logical replication parallel apply worker")));
    1128             :     }
    1129             : 
    1130          10 :     MemoryContextSwitchTo(oldcontext);
    1131             : 
    1132             :     /* Might as well clear the context on our way out */
    1133          10 :     MemoryContextReset(hpam_context);
    1134             : 
    1135          10 :     RESUME_INTERRUPTS();
    1136          10 : }
    1137             : 
    1138             : /*
    1139             :  * Send the data to the specified parallel apply worker via shared-memory
    1140             :  * queue.
    1141             :  *
    1142             :  * Returns false if the attempt to send data via shared memory times out, true
    1143             :  * otherwise.
    1144             :  */
    1145             : bool
    1146      137788 : pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
    1147             : {
    1148             :     int         rc;
    1149             :     shm_mq_result result;
    1150      137788 :     TimestampTz startTime = 0;
    1151             : 
    1152             :     Assert(!IsTransactionState());
    1153             :     Assert(!winfo->serialize_changes);
    1154             : 
    1155             :     /*
    1156             :      * We don't try to send data to parallel worker for 'immediate' mode. This
    1157             :      * is primarily used for testing purposes.
    1158             :      */
    1159      137788 :     if (unlikely(debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE))
    1160           8 :         return false;
    1161             : 
    1162             : /*
    1163             :  * This timeout is a bit arbitrary but testing revealed that it is sufficient
    1164             :  * to send the message unless the parallel apply worker is waiting on some
    1165             :  * lock or there is a serious resource crunch. See the comments atop this file
    1166             :  * to know why we are using a non-blocking way to send the message.
    1167             :  */
    1168             : #define SHM_SEND_RETRY_INTERVAL_MS 1000
    1169             : #define SHM_SEND_TIMEOUT_MS     (10000 - SHM_SEND_RETRY_INTERVAL_MS)
    1170             : 
    1171             :     for (;;)
    1172             :     {
    1173      137780 :         result = shm_mq_send(winfo->mq_handle, nbytes, data, true, true);
    1174             : 
    1175      137780 :         if (result == SHM_MQ_SUCCESS)
    1176      137780 :             return true;
    1177           0 :         else if (result == SHM_MQ_DETACHED)
    1178           0 :             ereport(ERROR,
    1179             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1180             :                      errmsg("could not send data to shared-memory queue")));
    1181             : 
    1182             :         Assert(result == SHM_MQ_WOULD_BLOCK);
    1183             : 
    1184             :         /* Wait before retrying. */
    1185           0 :         rc = WaitLatch(MyLatch,
    1186             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1187             :                        SHM_SEND_RETRY_INTERVAL_MS,
    1188             :                        WAIT_EVENT_LOGICAL_APPLY_SEND_DATA);
    1189             : 
    1190           0 :         if (rc & WL_LATCH_SET)
    1191             :         {
    1192           0 :             ResetLatch(MyLatch);
    1193           0 :             CHECK_FOR_INTERRUPTS();
    1194             :         }
    1195             : 
    1196           0 :         if (startTime == 0)
    1197           0 :             startTime = GetCurrentTimestamp();
    1198           0 :         else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
    1199             :                                             SHM_SEND_TIMEOUT_MS))
    1200           0 :             return false;
    1201             :     }
    1202             : }
    1203             : 
    1204             : /*
    1205             :  * Switch to PARTIAL_SERIALIZE mode for the current transaction -- this means
    1206             :  * that the current data and any subsequent data for this transaction will be
    1207             :  * serialized to a file. This is done to prevent possible deadlocks with
    1208             :  * another parallel apply worker (refer to the comments atop this file).
    1209             :  */
    1210             : void
    1211           8 : pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
    1212             :                                bool stream_locked)
    1213             : {
    1214           8 :     ereport(LOG,
    1215             :             (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
    1216             :                     winfo->shared->xid)));
    1217             : 
    1218             :     /*
    1219             :      * The parallel apply worker could be stuck for some reason (say waiting
    1220             :      * on some lock by other backend), so stop trying to send data directly to
    1221             :      * it and start serializing data to the file instead.
    1222             :      */
    1223           8 :     winfo->serialize_changes = true;
    1224             : 
    1225             :     /* Initialize the stream fileset. */
    1226           8 :     stream_start_internal(winfo->shared->xid, true);
    1227             : 
    1228             :     /*
    1229             :      * Acquires the stream lock if not already to make sure that the parallel
    1230             :      * apply worker will wait for the leader to release the stream lock until
    1231             :      * the end of the transaction.
    1232             :      */
    1233           8 :     if (!stream_locked)
    1234           8 :         pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
    1235             : 
    1236           8 :     pa_set_fileset_state(winfo->shared, FS_SERIALIZE_IN_PROGRESS);
    1237           8 : }
    1238             : 
    1239             : /*
    1240             :  * Wait until the parallel apply worker's transaction state has reached or
    1241             :  * exceeded the given xact_state.
    1242             :  */
    1243             : static void
    1244         496 : pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo,
    1245             :                        ParallelTransState xact_state)
    1246             : {
    1247             :     for (;;)
    1248             :     {
    1249             :         /*
    1250             :          * Stop if the transaction state has reached or exceeded the given
    1251             :          * xact_state.
    1252             :          */
    1253         496 :         if (pa_get_xact_state(winfo->shared) >= xact_state)
    1254          50 :             break;
    1255             : 
    1256             :         /* Wait to be signalled. */
    1257         446 :         (void) WaitLatch(MyLatch,
    1258             :                          WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1259             :                          10L,
    1260             :                          WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);
    1261             : 
    1262             :         /* Reset the latch so we don't spin. */
    1263         446 :         ResetLatch(MyLatch);
    1264             : 
    1265             :         /* An interrupt may have occurred while we were waiting. */
    1266         446 :         CHECK_FOR_INTERRUPTS();
    1267             :     }
    1268          50 : }
    1269             : 
    1270             : /*
    1271             :  * Wait until the parallel apply worker's transaction finishes.
    1272             :  */
    1273             : static void
    1274          50 : pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
    1275             : {
    1276             :     /*
    1277             :      * Wait until the parallel apply worker set the state to
    1278             :      * PARALLEL_TRANS_STARTED which means it has acquired the transaction
    1279             :      * lock. This is to prevent leader apply worker from acquiring the
    1280             :      * transaction lock earlier than the parallel apply worker.
    1281             :      */
    1282          50 :     pa_wait_for_xact_state(winfo, PARALLEL_TRANS_STARTED);
    1283             : 
    1284             :     /*
    1285             :      * Wait for the transaction lock to be released. This is required to
    1286             :      * detect deadlock among leader and parallel apply workers. Refer to the
    1287             :      * comments atop this file.
    1288             :      */
    1289          50 :     pa_lock_transaction(winfo->shared->xid, AccessShareLock);
    1290          48 :     pa_unlock_transaction(winfo->shared->xid, AccessShareLock);
    1291             : 
    1292             :     /*
    1293             :      * Check if the state becomes PARALLEL_TRANS_FINISHED in case the parallel
    1294             :      * apply worker failed while applying changes causing the lock to be
    1295             :      * released.
    1296             :      */
    1297          48 :     if (pa_get_xact_state(winfo->shared) != PARALLEL_TRANS_FINISHED)
    1298           0 :         ereport(ERROR,
    1299             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1300             :                  errmsg("lost connection to the logical replication parallel apply worker")));
    1301          48 : }
    1302             : 
    1303             : /*
    1304             :  * Set the transaction state for a given parallel apply worker.
    1305             :  */
    1306             : void
    1307         102 : pa_set_xact_state(ParallelApplyWorkerShared *wshared,
    1308             :                   ParallelTransState xact_state)
    1309             : {
    1310         102 :     SpinLockAcquire(&wshared->mutex);
    1311         102 :     wshared->xact_state = xact_state;
    1312         102 :     SpinLockRelease(&wshared->mutex);
    1313         102 : }
    1314             : 
    1315             : /*
    1316             :  * Get the transaction state for a given parallel apply worker.
    1317             :  */
    1318             : static ParallelTransState
    1319         544 : pa_get_xact_state(ParallelApplyWorkerShared *wshared)
    1320             : {
    1321             :     ParallelTransState xact_state;
    1322             : 
    1323         544 :     SpinLockAcquire(&wshared->mutex);
    1324         544 :     xact_state = wshared->xact_state;
    1325         544 :     SpinLockRelease(&wshared->mutex);
    1326             : 
    1327         544 :     return xact_state;
    1328             : }
    1329             : 
    1330             : /*
    1331             :  * Cache the parallel apply worker information.
    1332             :  */
    1333             : void
    1334        1008 : pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
    1335             : {
    1336        1008 :     stream_apply_worker = winfo;
    1337        1008 : }
    1338             : 
    1339             : /*
    1340             :  * Form a unique savepoint name for the streaming transaction.
    1341             :  *
    1342             :  * Note that different subscriptions for publications on different nodes can
    1343             :  * receive same remote xid, so we need to use subscription id along with it.
    1344             :  *
    1345             :  * Returns the name in the supplied buffer.
    1346             :  */
    1347             : static void
    1348          54 : pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
    1349             : {
    1350          54 :     snprintf(spname, szsp, "pg_sp_%u_%u", suboid, xid);
    1351          54 : }
    1352             : 
    1353             : /*
    1354             :  * Define a savepoint for a subxact in parallel apply worker if needed.
    1355             :  *
    1356             :  * The parallel apply worker can figure out if a new subtransaction was
    1357             :  * started by checking if the new change arrived with a different xid. In that
    1358             :  * case define a named savepoint, so that we are able to rollback to it
    1359             :  * if required.
    1360             :  */
    1361             : void
    1362      137698 : pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
    1363             : {
    1364      137698 :     if (current_xid != top_xid &&
    1365         104 :         !list_member_xid(subxactlist, current_xid))
    1366             :     {
    1367             :         MemoryContext oldctx;
    1368             :         char        spname[NAMEDATALEN];
    1369             : 
    1370          34 :         pa_savepoint_name(MySubscription->oid, current_xid,
    1371             :                           spname, sizeof(spname));
    1372             : 
    1373          34 :         elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);
    1374             : 
    1375             :         /* We must be in transaction block to define the SAVEPOINT. */
    1376          34 :         if (!IsTransactionBlock())
    1377             :         {
    1378          10 :             if (!IsTransactionState())
    1379           0 :                 StartTransactionCommand();
    1380             : 
    1381          10 :             BeginTransactionBlock();
    1382          10 :             CommitTransactionCommand();
    1383             :         }
    1384             : 
    1385          34 :         DefineSavepoint(spname);
    1386             : 
    1387             :         /*
    1388             :          * CommitTransactionCommand is needed to start a subtransaction after
    1389             :          * issuing a SAVEPOINT inside a transaction block (see
    1390             :          * StartSubTransaction()).
    1391             :          */
    1392          34 :         CommitTransactionCommand();
    1393             : 
    1394          34 :         oldctx = MemoryContextSwitchTo(TopTransactionContext);
    1395          34 :         subxactlist = lappend_xid(subxactlist, current_xid);
    1396          34 :         MemoryContextSwitchTo(oldctx);
    1397             :     }
    1398      137698 : }
    1399             : 
    1400             : /* Reset the list that maintains subtransactions. */
    1401             : void
    1402          48 : pa_reset_subtrans(void)
    1403             : {
    1404             :     /*
    1405             :      * We don't need to free this explicitly as the allocated memory will be
    1406             :      * freed at the transaction end.
    1407             :      */
    1408          48 :     subxactlist = NIL;
    1409          48 : }
    1410             : 
    1411             : /*
    1412             :  * Handle STREAM ABORT message when the transaction was applied in a parallel
    1413             :  * apply worker.
    1414             :  */
    1415             : void
    1416          24 : pa_stream_abort(LogicalRepStreamAbortData *abort_data)
    1417             : {
    1418          24 :     TransactionId xid = abort_data->xid;
    1419          24 :     TransactionId subxid = abort_data->subxid;
    1420             : 
    1421             :     /*
    1422             :      * Update origin state so we can restart streaming from correct position
    1423             :      * in case of crash.
    1424             :      */
    1425          24 :     replorigin_session_origin_lsn = abort_data->abort_lsn;
    1426          24 :     replorigin_session_origin_timestamp = abort_data->abort_time;
    1427             : 
    1428             :     /*
    1429             :      * If the two XIDs are the same, it's in fact abort of toplevel xact, so
    1430             :      * just free the subxactlist.
    1431             :      */
    1432          24 :     if (subxid == xid)
    1433             :     {
    1434           4 :         pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
    1435             : 
    1436             :         /*
    1437             :          * Release the lock as we might be processing an empty streaming
    1438             :          * transaction in which case the lock won't be released during
    1439             :          * transaction rollback.
    1440             :          *
    1441             :          * Note that it's ok to release the transaction lock before aborting
    1442             :          * the transaction because even if the parallel apply worker dies due
    1443             :          * to crash or some other reason, such a transaction would still be
    1444             :          * considered aborted.
    1445             :          */
    1446           4 :         pa_unlock_transaction(xid, AccessExclusiveLock);
    1447             : 
    1448           4 :         AbortCurrentTransaction();
    1449             : 
    1450           4 :         if (IsTransactionBlock())
    1451             :         {
    1452           2 :             EndTransactionBlock(false);
    1453           2 :             CommitTransactionCommand();
    1454             :         }
    1455             : 
    1456           4 :         pa_reset_subtrans();
    1457             : 
    1458           4 :         pgstat_report_activity(STATE_IDLE, NULL);
    1459             :     }
    1460             :     else
    1461             :     {
    1462             :         /* OK, so it's a subxact. Rollback to the savepoint. */
    1463             :         int         i;
    1464             :         char        spname[NAMEDATALEN];
    1465             : 
    1466          20 :         pa_savepoint_name(MySubscription->oid, subxid, spname, sizeof(spname));
    1467             : 
    1468          20 :         elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);
    1469             : 
    1470             :         /*
    1471             :          * Search the subxactlist, determine the offset tracked for the
    1472             :          * subxact, and truncate the list.
    1473             :          *
    1474             :          * Note that for an empty sub-transaction we won't find the subxid
    1475             :          * here.
    1476             :          */
    1477          24 :         for (i = list_length(subxactlist) - 1; i >= 0; i--)
    1478             :         {
    1479          22 :             TransactionId xid_tmp = lfirst_xid(list_nth_cell(subxactlist, i));
    1480             : 
    1481          22 :             if (xid_tmp == subxid)
    1482             :             {
    1483          18 :                 RollbackToSavepoint(spname);
    1484          18 :                 CommitTransactionCommand();
    1485          18 :                 subxactlist = list_truncate(subxactlist, i);
    1486          18 :                 break;
    1487             :             }
    1488             :         }
    1489             :     }
    1490          24 : }
    1491             : 
    1492             : /*
    1493             :  * Set the fileset state for a particular parallel apply worker. The fileset
    1494             :  * will be set once the leader worker serialized all changes to the file
    1495             :  * so that it can be used by parallel apply worker.
    1496             :  */
    1497             : void
    1498          32 : pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
    1499             :                      PartialFileSetState fileset_state)
    1500             : {
    1501          32 :     SpinLockAcquire(&wshared->mutex);
    1502          32 :     wshared->fileset_state = fileset_state;
    1503             : 
    1504          32 :     if (fileset_state == FS_SERIALIZE_DONE)
    1505             :     {
    1506             :         Assert(am_leader_apply_worker());
    1507             :         Assert(MyLogicalRepWorker->stream_fileset);
    1508           8 :         wshared->fileset = *MyLogicalRepWorker->stream_fileset;
    1509             :     }
    1510             : 
    1511          32 :     SpinLockRelease(&wshared->mutex);
    1512          32 : }
    1513             : 
    1514             : /*
    1515             :  * Get the fileset state for the current parallel apply worker.
    1516             :  */
    1517             : static PartialFileSetState
    1518         148 : pa_get_fileset_state(void)
    1519             : {
    1520             :     PartialFileSetState fileset_state;
    1521             : 
    1522             :     Assert(am_parallel_apply_worker());
    1523             : 
    1524         148 :     SpinLockAcquire(&MyParallelShared->mutex);
    1525         148 :     fileset_state = MyParallelShared->fileset_state;
    1526         148 :     SpinLockRelease(&MyParallelShared->mutex);
    1527             : 
    1528         148 :     return fileset_state;
    1529             : }
    1530             : 
    1531             : /*
    1532             :  * Helper functions to acquire and release a lock for each stream block.
    1533             :  *
    1534             :  * Set locktag_field4 to PARALLEL_APPLY_LOCK_STREAM to indicate that it's a
    1535             :  * stream lock.
    1536             :  *
    1537             :  * Refer to the comments atop this file to see how the stream lock is used.
    1538             :  */
    1539             : void
    1540         552 : pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
    1541             : {
    1542         552 :     LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
    1543             :                                    PARALLEL_APPLY_LOCK_STREAM, lockmode);
    1544         548 : }
    1545             : 
    1546             : void
    1547         544 : pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
    1548             : {
    1549         544 :     UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
    1550             :                                      PARALLEL_APPLY_LOCK_STREAM, lockmode);
    1551         544 : }
    1552             : 
    1553             : /*
    1554             :  * Helper functions to acquire and release a lock for each local transaction
    1555             :  * apply.
    1556             :  *
    1557             :  * Set locktag_field4 to PARALLEL_APPLY_LOCK_XACT to indicate that it's a
    1558             :  * transaction lock.
    1559             :  *
    1560             :  * Note that all the callers must pass a remote transaction ID instead of a
    1561             :  * local transaction ID as xid. This is because the local transaction ID will
    1562             :  * only be assigned while applying the first change in the parallel apply but
    1563             :  * it's possible that the first change in the parallel apply worker is blocked
    1564             :  * by a concurrently executing transaction in another parallel apply worker. We
    1565             :  * can only communicate the local transaction id to the leader after applying
    1566             :  * the first change so it won't be able to wait after sending the xact finish
    1567             :  * command using this lock.
    1568             :  *
    1569             :  * Refer to the comments atop this file to see how the transaction lock is
    1570             :  * used.
    1571             :  */
    1572             : void
    1573         104 : pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
    1574             : {
    1575         104 :     LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
    1576             :                                    PARALLEL_APPLY_LOCK_XACT, lockmode);
    1577         102 : }
    1578             : 
    1579             : void
    1580          96 : pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
    1581             : {
    1582          96 :     UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
    1583             :                                      PARALLEL_APPLY_LOCK_XACT, lockmode);
    1584          96 : }
    1585             : 
    1586             : /*
    1587             :  * Decrement the number of pending streaming blocks and wait on the stream lock
    1588             :  * if there is no pending block available.
    1589             :  */
    1590             : void
    1591         502 : pa_decr_and_wait_stream_block(void)
    1592             : {
    1593             :     Assert(am_parallel_apply_worker());
    1594             : 
    1595             :     /*
    1596             :      * It is only possible to not have any pending stream chunks when we are
    1597             :      * applying spooled messages.
    1598             :      */
    1599         502 :     if (pg_atomic_read_u32(&MyParallelShared->pending_stream_count) == 0)
    1600             :     {
    1601          32 :         if (pa_has_spooled_message_pending())
    1602          32 :             return;
    1603             : 
    1604           0 :         elog(ERROR, "invalid pending streaming chunk 0");
    1605             :     }
    1606             : 
    1607         470 :     if (pg_atomic_sub_fetch_u32(&MyParallelShared->pending_stream_count, 1) == 0)
    1608             :     {
    1609          52 :         pa_lock_stream(MyParallelShared->xid, AccessShareLock);
    1610          48 :         pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
    1611             :     }
    1612             : }
    1613             : 
    1614             : /*
    1615             :  * Finish processing the streaming transaction in the leader apply worker.
    1616             :  */
    1617             : void
    1618          50 : pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
    1619             : {
    1620             :     Assert(am_leader_apply_worker());
    1621             : 
    1622             :     /*
    1623             :      * Unlock the shared object lock so that parallel apply worker can
    1624             :      * continue to receive and apply changes.
    1625             :      */
    1626          50 :     pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
    1627             : 
    1628             :     /*
    1629             :      * Wait for that worker to finish. This is necessary to maintain commit
    1630             :      * order which avoids failures due to transaction dependencies and
    1631             :      * deadlocks.
    1632             :      */
    1633          50 :     pa_wait_for_xact_finish(winfo);
    1634             : 
    1635          48 :     if (!XLogRecPtrIsInvalid(remote_lsn))
    1636          44 :         store_flush_position(remote_lsn, winfo->shared->last_commit_end);
    1637             : 
    1638          48 :     pa_free_worker(winfo);
    1639          48 : }

Generated by: LCOV version 1.14