LCOV - code coverage report
Current view: top level - src/backend/replication/logical - applyparallelworker.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 89.5 % 411 368
Test Date: 2026-04-26 21:16:32 Functions: 100.0 % 36 36
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  * applyparallelworker.c
       3              :  *     Support routines for applying xact by parallel apply worker
       4              :  *
       5              :  * Copyright (c) 2023-2026, PostgreSQL Global Development Group
       6              :  *
       7              :  * IDENTIFICATION
       8              :  *    src/backend/replication/logical/applyparallelworker.c
       9              :  *
      10              :  * This file contains the code to launch, set up, and teardown a parallel apply
      11              :  * worker which receives the changes from the leader worker and invokes routines
      12              :  * to apply those on the subscriber database. Additionally, this file contains
      13              :  * routines that are intended to support setting up, using, and tearing down a
      14              :  * ParallelApplyWorkerInfo which is required so the leader worker and parallel
      15              :  * apply workers can communicate with each other.
      16              :  *
      17              :  * The parallel apply workers are assigned (if available) as soon as xact's
      18              :  * first stream is received for subscriptions that have set their 'streaming'
      19              :  * option as parallel. The leader apply worker will send changes to this new
      20              :  * worker via shared memory. We keep this worker assigned till the transaction
      21              :  * commit is received and also wait for the worker to finish at commit. This
      22              :  * preserves commit ordering and avoid file I/O in most cases, although we
      23              :  * still need to spill to a file if there is no worker available. See comments
      24              :  * atop logical/worker to know more about streamed xacts whose changes are
      25              :  * spilled to disk. It is important to maintain commit order to avoid failures
      26              :  * due to: (a) transaction dependencies - say if we insert a row in the first
      27              :  * transaction and update it in the second transaction on publisher then
      28              :  * allowing the subscriber to apply both in parallel can lead to failure in the
      29              :  * update; (b) deadlocks - allowing transactions that update the same set of
      30              :  * rows/tables in the opposite order to be applied in parallel can lead to
      31              :  * deadlocks.
      32              :  *
      33              :  * A worker pool is used to avoid restarting workers for each streaming
      34              :  * transaction. We maintain each worker's information (ParallelApplyWorkerInfo)
      35              :  * in the ParallelApplyWorkerPool. After successfully launching a new worker,
      36              :  * its information is added to the ParallelApplyWorkerPool. Once the worker
      37              :  * finishes applying the transaction, it is marked as available for re-use.
      38              :  * Now, before starting a new worker to apply the streaming transaction, we
      39              :  * check the list for any available worker. Note that we retain a maximum of
      40              :  * half the max_parallel_apply_workers_per_subscription workers in the pool and
      41              :  * after that, we simply exit the worker after applying the transaction.
      42              :  *
      43              :  * XXX This worker pool threshold is arbitrary and we can provide a GUC
      44              :  * variable for this in the future if required.
      45              :  *
      46              :  * The leader apply worker will create a separate dynamic shared memory segment
      47              :  * when each parallel apply worker starts. The reason for this design is that
      48              :  * we cannot predict how many workers will be needed. It may be possible to
      49              :  * allocate enough shared memory in one segment based on the maximum number of
      50              :  * parallel apply workers (max_parallel_apply_workers_per_subscription), but
      51              :  * this would waste memory if no process is actually started.
      52              :  *
      53              :  * The dynamic shared memory segment contains: (a) a shm_mq that is used to
      54              :  * send changes in the transaction from leader apply worker to parallel apply
      55              :  * worker; (b) another shm_mq that is used to send errors (and other messages
      56              :  * reported via elog/ereport) from the parallel apply worker to leader apply
      57              :  * worker; (c) necessary information to be shared among parallel apply workers
      58              :  * and the leader apply worker (i.e. members of ParallelApplyWorkerShared).
      59              :  *
      60              :  * Locking Considerations
      61              :  * ----------------------
      62              :  * We have a risk of deadlock due to concurrently applying the transactions in
      63              :  * parallel mode that were independent on the publisher side but became
      64              :  * dependent on the subscriber side due to the different database structures
      65              :  * (like schema of subscription tables, constraints, etc.) on each side. This
      66              :  * can happen even without parallel mode when there are concurrent operations
      67              :  * on the subscriber. In order to detect the deadlocks among leader (LA) and
      68              :  * parallel apply (PA) workers, we used lmgr locks when the PA waits for the
      69              :  * next stream (set of changes) and LA waits for PA to finish the transaction.
      70              :  * An alternative approach could be to not allow parallelism when the schema of
      71              :  * tables is different between the publisher and subscriber but that would be
      72              :  * too restrictive and would require the publisher to send much more
      73              :  * information than it is currently sending.
      74              :  *
      75              :  * Consider a case where the subscribed table does not have a unique key on the
      76              :  * publisher and has a unique key on the subscriber. The deadlock can happen in
      77              :  * the following ways:
      78              :  *
      79              :  * 1) Deadlock between the leader apply worker and a parallel apply worker
      80              :  *
      81              :  * Consider that the parallel apply worker (PA) is executing TX-1 and the
      82              :  * leader apply worker (LA) is executing TX-2 concurrently on the subscriber.
      83              :  * Now, LA is waiting for PA because of the unique key constraint of the
      84              :  * subscribed table while PA is waiting for LA to send the next stream of
      85              :  * changes or transaction finish command message.
      86              :  *
      87              :  * In order for lmgr to detect this, we have LA acquire a session lock on the
      88              :  * remote transaction (by pa_lock_stream()) and have PA wait on the lock before
      89              :  * trying to receive the next stream of changes. Specifically, LA will acquire
      90              :  * the lock in AccessExclusive mode before sending the STREAM_STOP and will
      91              :  * release it if already acquired after sending the STREAM_START, STREAM_ABORT
      92              :  * (for toplevel transaction), STREAM_PREPARE, and STREAM_COMMIT. The PA will
      93              :  * acquire the lock in AccessShare mode after processing STREAM_STOP and
      94              :  * STREAM_ABORT (for subtransaction) and then release the lock immediately
      95              :  * after acquiring it.
      96              :  *
      97              :  * The lock graph for the above example will look as follows:
      98              :  * LA (waiting to acquire the lock on the unique index) -> PA (waiting to
      99              :  * acquire the stream lock) -> LA
     100              :  *
     101              :  * This way, when PA is waiting for LA for the next stream of changes, we can
     102              :  * have a wait-edge from PA to LA in lmgr, which will make us detect the
     103              :  * deadlock between LA and PA.
     104              :  *
     105              :  * 2) Deadlock between the leader apply worker and parallel apply workers
     106              :  *
     107              :  * This scenario is similar to the first case but TX-1 and TX-2 are executed by
     108              :  * two parallel apply workers (PA-1 and PA-2 respectively). In this scenario,
     109              :  * PA-2 is waiting for PA-1 to complete its transaction while PA-1 is waiting
     110              :  * for subsequent input from LA. Also, LA is waiting for PA-2 to complete its
     111              :  * transaction in order to preserve the commit order. There is a deadlock among
     112              :  * the three processes.
     113              :  *
     114              :  * In order for lmgr to detect this, we have PA acquire a session lock (this is
     115              :  * a different lock than referred in the previous case, see
     116              :  * pa_lock_transaction()) on the transaction being applied and have LA wait on
     117              :  * the lock before proceeding in the transaction finish commands. Specifically,
     118              :  * PA will acquire this lock in AccessExclusive mode before executing the first
     119              :  * message of the transaction and release it at the xact end. LA will acquire
     120              :  * this lock in AccessShare mode at transaction finish commands and release it
     121              :  * immediately.
     122              :  *
     123              :  * The lock graph for the above example will look as follows:
     124              :  * LA (waiting to acquire the transaction lock) -> PA-2 (waiting to acquire the
     125              :  * lock due to unique index constraint) -> PA-1 (waiting to acquire the stream
     126              :  * lock) -> LA
     127              :  *
     128              :  * This way when LA is waiting to finish the transaction end command to preserve
     129              :  * the commit order, we will be able to detect deadlock, if any.
     130              :  *
     131              :  * One might think we can use XactLockTableWait(), but XactLockTableWait()
     132              :  * considers PREPARED TRANSACTION as still in progress which means the lock
     133              :  * won't be released even after the parallel apply worker has prepared the
     134              :  * transaction.
     135              :  *
     136              :  * 3) Deadlock when the shm_mq buffer is full
     137              :  *
     138              :  * In the previous scenario (ie. PA-1 and PA-2 are executing transactions
     139              :  * concurrently), if the shm_mq buffer between LA and PA-2 is full, LA has to
     140              :  * wait to send messages, and this wait doesn't appear in lmgr.
     141              :  *
     142              :  * To avoid this wait, we use a non-blocking write and wait with a timeout. If
     143              :  * the timeout is exceeded, the LA will serialize all the pending messages to
     144              :  * a file and indicate PA-2 that it needs to read that file for the remaining
     145              :  * messages. Then LA will start waiting for commit as in the previous case
     146              :  * which will detect deadlock if any. See pa_send_data() and
     147              :  * enum TransApplyAction.
     148              :  *
     149              :  * Lock types
     150              :  * ----------
     151              :  * Both the stream lock and the transaction lock mentioned above are
     152              :  * session-level locks because both locks could be acquired outside the
     153              :  * transaction, and the stream lock in the leader needs to persist across
     154              :  * transaction boundaries i.e. until the end of the streaming transaction.
     155              :  *-------------------------------------------------------------------------
     156              :  */
     157              : 
     158              : #include "postgres.h"
     159              : 
     160              : #include "libpq/pqformat.h"
     161              : #include "libpq/pqmq.h"
     162              : #include "pgstat.h"
     163              : #include "postmaster/interrupt.h"
     164              : #include "replication/logicallauncher.h"
     165              : #include "replication/logicalworker.h"
     166              : #include "replication/origin.h"
     167              : #include "replication/worker_internal.h"
     168              : #include "storage/ipc.h"
     169              : #include "storage/latch.h"
     170              : #include "storage/lmgr.h"
     171              : #include "storage/proc.h"
     172              : #include "tcop/tcopprot.h"
     173              : #include "utils/inval.h"
     174              : #include "utils/memutils.h"
     175              : #include "utils/syscache.h"
     176              : #include "utils/wait_event.h"
     177              : 
     178              : #define PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067
     179              : 
     180              : /*
     181              :  * DSM keys for parallel apply worker. Unlike other parallel execution code,
     182              :  * since we don't need to worry about DSM keys conflicting with plan_node_id we
     183              :  * can use small integers.
     184              :  */
     185              : #define PARALLEL_APPLY_KEY_SHARED       1
     186              : #define PARALLEL_APPLY_KEY_MQ           2
     187              : #define PARALLEL_APPLY_KEY_ERROR_QUEUE  3
     188              : 
     189              : /* Queue size of DSM, 16 MB for now. */
     190              : #define DSM_QUEUE_SIZE  (16 * 1024 * 1024)
     191              : 
     192              : /*
     193              :  * Error queue size of DSM. It is desirable to make it large enough that a
     194              :  * typical ErrorResponse can be sent without blocking. That way, a worker that
     195              :  * errors out can write the whole message into the queue and terminate without
     196              :  * waiting for the user backend.
     197              :  */
     198              : #define DSM_ERROR_QUEUE_SIZE            (16 * 1024)
     199              : 
     200              : /*
     201              :  * There are three fields in each message received by the parallel apply
     202              :  * worker: start_lsn, end_lsn and send_time. Because we have updated these
     203              :  * statistics in the leader apply worker, we can ignore these fields in the
     204              :  * parallel apply worker (see function LogicalRepApplyLoop).
     205              :  */
     206              : #define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
     207              : 
     208              : /*
     209              :  * The type of session-level lock on a transaction being applied on a logical
     210              :  * replication subscriber.
     211              :  */
     212              : #define PARALLEL_APPLY_LOCK_STREAM  0
     213              : #define PARALLEL_APPLY_LOCK_XACT    1
     214              : 
     215              : /*
     216              :  * Hash table entry to map xid to the parallel apply worker state.
     217              :  */
     218              : typedef struct ParallelApplyWorkerEntry
     219              : {
     220              :     TransactionId xid;          /* Hash key -- must be first */
     221              :     ParallelApplyWorkerInfo *winfo;
     222              : } ParallelApplyWorkerEntry;
     223              : 
     224              : /*
     225              :  * A hash table used to cache the state of streaming transactions being applied
     226              :  * by the parallel apply workers.
     227              :  */
     228              : static HTAB *ParallelApplyTxnHash = NULL;
     229              : 
     230              : /*
     231              : * A list (pool) of active parallel apply workers. The information for
     232              : * the new worker is added to the list after successfully launching it. The
     233              : * list entry is removed if there are already enough workers in the worker
     234              : * pool at the end of the transaction. For more information about the worker
     235              : * pool, see comments atop this file.
     236              :  */
     237              : static List *ParallelApplyWorkerPool = NIL;
     238              : 
     239              : /*
     240              :  * Information shared between leader apply worker and parallel apply worker.
     241              :  */
     242              : ParallelApplyWorkerShared *MyParallelShared = NULL;
     243              : 
     244              : /*
     245              :  * Is there a message sent by a parallel apply worker that the leader apply
     246              :  * worker needs to receive?
     247              :  */
     248              : volatile sig_atomic_t ParallelApplyMessagePending = false;
     249              : 
     250              : /*
     251              :  * Cache the parallel apply worker information required for applying the
     252              :  * current streaming transaction. It is used to save the cost of searching the
     253              :  * hash table when applying the changes between STREAM_START and STREAM_STOP.
     254              :  */
     255              : static ParallelApplyWorkerInfo *stream_apply_worker = NULL;
     256              : 
     257              : /* A list to maintain subtransactions, if any. */
     258              : static List *subxactlist = NIL;
     259              : 
     260              : static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo);
     261              : static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared);
     262              : static PartialFileSetState pa_get_fileset_state(void);
     263              : 
     264              : /*
     265              :  * Returns true if it is OK to start a parallel apply worker, false otherwise.
     266              :  */
     267              : static bool
     268           88 : pa_can_start(void)
     269              : {
     270              :     /* Only leader apply workers can start parallel apply workers. */
     271           88 :     if (!am_leader_apply_worker())
     272           30 :         return false;
     273              : 
     274              :     /*
     275              :      * It is good to check for any change in the subscription parameter to
     276              :      * avoid the case where for a very long time the change doesn't get
     277              :      * reflected. This can happen when there is a constant flow of streaming
     278              :      * transactions that are handled by parallel apply workers.
     279              :      *
     280              :      * It is better to do it before the below checks so that the latest values
     281              :      * of subscription can be used for the checks.
     282              :      */
     283           58 :     maybe_reread_subscription();
     284              : 
     285              :     /*
     286              :      * Don't start a new parallel apply worker if the subscription is not
     287              :      * using parallel streaming mode, or if the publisher does not support
     288              :      * parallel apply.
     289              :      */
     290           58 :     if (!MyLogicalRepWorker->parallel_apply)
     291           28 :         return false;
     292              : 
     293              :     /*
     294              :      * Don't start a new parallel worker if user has set skiplsn as it's
     295              :      * possible that they want to skip the streaming transaction. For
     296              :      * streaming transactions, we need to serialize the transaction to a file
     297              :      * so that we can get the last LSN of the transaction to judge whether to
     298              :      * skip before starting to apply the change.
     299              :      *
     300              :      * One might think that we could allow parallelism if the first lsn of the
     301              :      * transaction is greater than skiplsn, but we don't send it with the
     302              :      * STREAM START message, and it doesn't seem worth sending the extra eight
     303              :      * bytes with the STREAM START to enable parallelism for this case.
     304              :      */
     305           30 :     if (XLogRecPtrIsValid(MySubscription->skiplsn))
     306            0 :         return false;
     307              : 
     308              :     /*
     309              :      * For streaming transactions that are being applied using a parallel
     310              :      * apply worker, we cannot decide whether to apply the change for a
     311              :      * relation that is not in the READY state (see
     312              :      * should_apply_changes_for_rel) as we won't know remote_final_lsn by that
     313              :      * time. So, we don't start the new parallel apply worker in this case.
     314              :      */
     315           30 :     if (!AllTablesyncsReady())
     316            0 :         return false;
     317              : 
     318           30 :     return true;
     319              : }
     320              : 
     321              : /*
     322              :  * Set up a dynamic shared memory segment.
     323              :  *
     324              :  * We set up a control region that contains a fixed-size worker info
     325              :  * (ParallelApplyWorkerShared), a message queue, and an error queue.
     326              :  *
     327              :  * Returns true on success, false on failure.
     328              :  */
     329              : static bool
     330           13 : pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
     331              : {
     332              :     shm_toc_estimator e;
     333              :     Size        segsize;
     334              :     dsm_segment *seg;
     335              :     shm_toc    *toc;
     336              :     ParallelApplyWorkerShared *shared;
     337              :     shm_mq     *mq;
     338           13 :     Size        queue_size = DSM_QUEUE_SIZE;
     339           13 :     Size        error_queue_size = DSM_ERROR_QUEUE_SIZE;
     340              : 
     341              :     /*
     342              :      * Estimate how much shared memory we need.
     343              :      *
     344              :      * Because the TOC machinery may choose to insert padding of oddly-sized
     345              :      * requests, we must estimate each chunk separately.
     346              :      *
     347              :      * We need one key to register the location of the header, and two other
     348              :      * keys to track the locations of the message queue and the error message
     349              :      * queue.
     350              :      */
     351           13 :     shm_toc_initialize_estimator(&e);
     352           13 :     shm_toc_estimate_chunk(&e, sizeof(ParallelApplyWorkerShared));
     353           13 :     shm_toc_estimate_chunk(&e, queue_size);
     354           13 :     shm_toc_estimate_chunk(&e, error_queue_size);
     355              : 
     356           13 :     shm_toc_estimate_keys(&e, 3);
     357           13 :     segsize = shm_toc_estimate(&e);
     358              : 
     359              :     /* Create the shared memory segment and establish a table of contents. */
     360           13 :     seg = dsm_create(shm_toc_estimate(&e), 0);
     361           13 :     if (!seg)
     362            0 :         return false;
     363              : 
     364           13 :     toc = shm_toc_create(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg),
     365              :                          segsize);
     366              : 
     367              :     /* Set up the header region. */
     368           13 :     shared = shm_toc_allocate(toc, sizeof(ParallelApplyWorkerShared));
     369           13 :     SpinLockInit(&shared->mutex);
     370              : 
     371           13 :     shared->xact_state = PARALLEL_TRANS_UNKNOWN;
     372           13 :     pg_atomic_init_u32(&(shared->pending_stream_count), 0);
     373           13 :     shared->last_commit_end = InvalidXLogRecPtr;
     374           13 :     shared->fileset_state = FS_EMPTY;
     375              : 
     376           13 :     shm_toc_insert(toc, PARALLEL_APPLY_KEY_SHARED, shared);
     377              : 
     378              :     /* Set up message queue for the worker. */
     379           13 :     mq = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
     380           13 :     shm_toc_insert(toc, PARALLEL_APPLY_KEY_MQ, mq);
     381           13 :     shm_mq_set_sender(mq, MyProc);
     382              : 
     383              :     /* Attach the queue. */
     384           13 :     winfo->mq_handle = shm_mq_attach(mq, seg, NULL);
     385              : 
     386              :     /* Set up error queue for the worker. */
     387           13 :     mq = shm_mq_create(shm_toc_allocate(toc, error_queue_size),
     388              :                        error_queue_size);
     389           13 :     shm_toc_insert(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, mq);
     390           13 :     shm_mq_set_receiver(mq, MyProc);
     391              : 
     392              :     /* Attach the queue. */
     393           13 :     winfo->error_mq_handle = shm_mq_attach(mq, seg, NULL);
     394              : 
     395              :     /* Return results to caller. */
     396           13 :     winfo->dsm_seg = seg;
     397           13 :     winfo->shared = shared;
     398              : 
     399           13 :     return true;
     400              : }
     401              : 
     402              : /*
     403              :  * Try to get a parallel apply worker from the pool. If none is available then
     404              :  * start a new one.
     405              :  */
     406              : static ParallelApplyWorkerInfo *
     407           30 : pa_launch_parallel_worker(void)
     408              : {
     409              :     MemoryContext oldcontext;
     410              :     bool        launched;
     411              :     ParallelApplyWorkerInfo *winfo;
     412              :     ListCell   *lc;
     413              : 
     414              :     /* Try to get an available parallel apply worker from the worker pool. */
     415           32 :     foreach(lc, ParallelApplyWorkerPool)
     416              :     {
     417           19 :         winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
     418              : 
     419           19 :         if (!winfo->in_use)
     420           17 :             return winfo;
     421              :     }
     422              : 
     423              :     /*
     424              :      * Start a new parallel apply worker.
     425              :      *
     426              :      * The worker info can be used for the lifetime of the worker process, so
     427              :      * create it in a permanent context.
     428              :      */
     429           13 :     oldcontext = MemoryContextSwitchTo(ApplyContext);
     430              : 
     431           13 :     winfo = palloc0_object(ParallelApplyWorkerInfo);
     432              : 
     433              :     /* Setup shared memory. */
     434           13 :     if (!pa_setup_dsm(winfo))
     435              :     {
     436            0 :         MemoryContextSwitchTo(oldcontext);
     437            0 :         pfree(winfo);
     438            0 :         return NULL;
     439              :     }
     440              : 
     441           13 :     launched = logicalrep_worker_launch(WORKERTYPE_PARALLEL_APPLY,
     442           13 :                                         MyLogicalRepWorker->dbid,
     443           13 :                                         MySubscription->oid,
     444           13 :                                         MySubscription->name,
     445           13 :                                         MyLogicalRepWorker->userid,
     446              :                                         InvalidOid,
     447              :                                         dsm_segment_handle(winfo->dsm_seg),
     448              :                                         false);
     449              : 
     450           13 :     if (launched)
     451              :     {
     452           13 :         ParallelApplyWorkerPool = lappend(ParallelApplyWorkerPool, winfo);
     453              :     }
     454              :     else
     455              :     {
     456            0 :         pa_free_worker_info(winfo);
     457            0 :         winfo = NULL;
     458              :     }
     459              : 
     460           13 :     MemoryContextSwitchTo(oldcontext);
     461              : 
     462           13 :     return winfo;
     463              : }
     464              : 
     465              : /*
     466              :  * Allocate a parallel apply worker that will be used for the specified xid.
     467              :  *
     468              :  * We first try to get an available worker from the pool, if any and then try
     469              :  * to launch a new worker. On successful allocation, remember the worker
     470              :  * information in the hash table so that we can get it later for processing the
     471              :  * streaming changes.
     472              :  */
     473              : void
     474           88 : pa_allocate_worker(TransactionId xid)
     475              : {
     476              :     bool        found;
     477           88 :     ParallelApplyWorkerInfo *winfo = NULL;
     478              :     ParallelApplyWorkerEntry *entry;
     479              : 
     480           88 :     if (!pa_can_start())
     481           58 :         return;
     482              : 
     483           30 :     winfo = pa_launch_parallel_worker();
     484           30 :     if (!winfo)
     485            0 :         return;
     486              : 
     487              :     /* First time through, initialize parallel apply worker state hashtable. */
     488           30 :     if (!ParallelApplyTxnHash)
     489              :     {
     490              :         HASHCTL     ctl;
     491              : 
     492          110 :         MemSet(&ctl, 0, sizeof(ctl));
     493           10 :         ctl.keysize = sizeof(TransactionId);
     494           10 :         ctl.entrysize = sizeof(ParallelApplyWorkerEntry);
     495           10 :         ctl.hcxt = ApplyContext;
     496              : 
     497           10 :         ParallelApplyTxnHash = hash_create("logical replication parallel apply workers hash",
     498              :                                            16, &ctl,
     499              :                                            HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
     500              :     }
     501              : 
     502              :     /* Create an entry for the requested transaction. */
     503           30 :     entry = hash_search(ParallelApplyTxnHash, &xid, HASH_ENTER, &found);
     504           30 :     if (found)
     505            0 :         elog(ERROR, "hash table corrupted");
     506              : 
     507              :     /* Update the transaction information in shared memory. */
     508           30 :     SpinLockAcquire(&winfo->shared->mutex);
     509           30 :     winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN;
     510           30 :     winfo->shared->xid = xid;
     511           30 :     SpinLockRelease(&winfo->shared->mutex);
     512              : 
     513           30 :     winfo->in_use = true;
     514           30 :     winfo->serialize_changes = false;
     515           30 :     entry->winfo = winfo;
     516              : }
     517              : 
     518              : /*
     519              :  * Find the assigned worker for the given transaction, if any.
     520              :  */
     521              : ParallelApplyWorkerInfo *
     522       278118 : pa_find_worker(TransactionId xid)
     523              : {
     524              :     bool        found;
     525              :     ParallelApplyWorkerEntry *entry;
     526              : 
     527       278118 :     if (!TransactionIdIsValid(xid))
     528       100929 :         return NULL;
     529              : 
     530       177189 :     if (!ParallelApplyTxnHash)
     531       103241 :         return NULL;
     532              : 
     533              :     /* Return the cached parallel apply worker if valid. */
     534        73948 :     if (stream_apply_worker)
     535        73654 :         return stream_apply_worker;
     536              : 
     537              :     /* Find an entry for the requested transaction. */
     538          294 :     entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
     539          294 :     if (found)
     540              :     {
     541              :         /* The worker must not have exited.  */
     542              :         Assert(entry->winfo->in_use);
     543          294 :         return entry->winfo;
     544              :     }
     545              : 
     546            0 :     return NULL;
     547              : }
     548              : 
     549              : /*
     550              :  * Makes the worker available for reuse.
     551              :  *
     552              :  * This removes the parallel apply worker entry from the hash table so that it
     553              :  * can't be used. If there are enough workers in the pool, it stops the worker
     554              :  * and frees the corresponding info. Otherwise it just marks the worker as
     555              :  * available for reuse.
     556              :  *
     557              :  * For more information about the worker pool, see comments atop this file.
     558              :  */
     559              : static void
     560           25 : pa_free_worker(ParallelApplyWorkerInfo *winfo)
     561              : {
     562              :     Assert(!am_parallel_apply_worker());
     563              :     Assert(winfo->in_use);
     564              :     Assert(pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED);
     565              : 
     566           25 :     if (!hash_search(ParallelApplyTxnHash, &winfo->shared->xid, HASH_REMOVE, NULL))
     567            0 :         elog(ERROR, "hash table corrupted");
     568              : 
     569              :     /*
     570              :      * Stop the worker if there are enough workers in the pool.
     571              :      *
     572              :      * XXX Additionally, we also stop the worker if the leader apply worker
     573              :      * serialize part of the transaction data due to a send timeout. This is
     574              :      * because the message could be partially written to the queue and there
     575              :      * is no way to clean the queue other than resending the message until it
     576              :      * succeeds. Instead of trying to send the data which anyway would have
     577              :      * been serialized and then letting the parallel apply worker deal with
     578              :      * the spurious message, we stop the worker.
     579              :      */
     580           25 :     if (winfo->serialize_changes ||
     581           21 :         list_length(ParallelApplyWorkerPool) >
     582           21 :         (max_parallel_apply_workers_per_subscription / 2))
     583              :     {
     584            5 :         logicalrep_pa_worker_stop(winfo);
     585            5 :         pa_free_worker_info(winfo);
     586              : 
     587            5 :         return;
     588              :     }
     589              : 
     590           20 :     winfo->in_use = false;
     591           20 :     winfo->serialize_changes = false;
     592              : }
     593              : 
     594              : /*
     595              :  * Free the parallel apply worker information and unlink the files with
     596              :  * serialized changes if any.
     597              :  */
     598              : static void
     599            5 : pa_free_worker_info(ParallelApplyWorkerInfo *winfo)
     600              : {
     601              :     Assert(winfo);
     602              : 
     603            5 :     if (winfo->mq_handle)
     604            5 :         shm_mq_detach(winfo->mq_handle);
     605              : 
     606            5 :     if (winfo->error_mq_handle)
     607            0 :         shm_mq_detach(winfo->error_mq_handle);
     608              : 
     609              :     /* Unlink the files with serialized changes. */
     610            5 :     if (winfo->serialize_changes)
     611            4 :         stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid);
     612              : 
     613            5 :     if (winfo->dsm_seg)
     614            5 :         dsm_detach(winfo->dsm_seg);
     615              : 
     616              :     /* Remove from the worker pool. */
     617            5 :     ParallelApplyWorkerPool = list_delete_ptr(ParallelApplyWorkerPool, winfo);
     618              : 
     619            5 :     pfree(winfo);
     620            5 : }
     621              : 
     622              : /*
     623              :  * Detach the error queue for all parallel apply workers.
     624              :  */
     625              : void
     626          397 : pa_detach_all_error_mq(void)
     627              : {
     628              :     ListCell   *lc;
     629              : 
     630          405 :     foreach(lc, ParallelApplyWorkerPool)
     631              :     {
     632            8 :         ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
     633              : 
     634            8 :         if (winfo->error_mq_handle)
     635              :         {
     636            8 :             shm_mq_detach(winfo->error_mq_handle);
     637            8 :             winfo->error_mq_handle = NULL;
     638              :         }
     639              :     }
     640          397 : }
     641              : 
     642              : /*
     643              :  * Check if there are any pending spooled messages.
     644              :  */
     645              : static bool
     646           16 : pa_has_spooled_message_pending(void)
     647              : {
     648              :     PartialFileSetState fileset_state;
     649              : 
     650           16 :     fileset_state = pa_get_fileset_state();
     651              : 
     652           16 :     return (fileset_state != FS_EMPTY);
     653              : }
     654              : 
     655              : /*
     656              :  * Replay the spooled messages once the leader apply worker has finished
     657              :  * serializing changes to the file.
     658              :  *
     659              :  * Returns false if there aren't any pending spooled messages, true otherwise.
     660              :  */
     661              : static bool
     662           64 : pa_process_spooled_messages_if_required(void)
     663              : {
     664              :     PartialFileSetState fileset_state;
     665              : 
     666           64 :     fileset_state = pa_get_fileset_state();
     667              : 
     668           64 :     if (fileset_state == FS_EMPTY)
     669           56 :         return false;
     670              : 
     671              :     /*
     672              :      * If the leader apply worker is busy serializing the partial changes then
     673              :      * acquire the stream lock now and wait for the leader worker to finish
     674              :      * serializing the changes. Otherwise, the parallel apply worker won't get
     675              :      * a chance to receive a STREAM_STOP (and acquire the stream lock) until
     676              :      * the leader had serialized all changes which can lead to undetected
     677              :      * deadlock.
     678              :      *
     679              :      * Note that the fileset state can be FS_SERIALIZE_DONE once the leader
     680              :      * worker has finished serializing the changes.
     681              :      */
     682            8 :     if (fileset_state == FS_SERIALIZE_IN_PROGRESS)
     683              :     {
     684            0 :         pa_lock_stream(MyParallelShared->xid, AccessShareLock);
     685            0 :         pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
     686              : 
     687            0 :         fileset_state = pa_get_fileset_state();
     688              :     }
     689              : 
     690              :     /*
     691              :      * We cannot read the file immediately after the leader has serialized all
     692              :      * changes to the file because there may still be messages in the memory
     693              :      * queue. We will apply all spooled messages the next time we call this
     694              :      * function and that will ensure there are no messages left in the memory
     695              :      * queue.
     696              :      */
     697            8 :     if (fileset_state == FS_SERIALIZE_DONE)
     698              :     {
     699            4 :         pa_set_fileset_state(MyParallelShared, FS_READY);
     700              :     }
     701            4 :     else if (fileset_state == FS_READY)
     702              :     {
     703            4 :         apply_spooled_messages(&MyParallelShared->fileset,
     704            4 :                                MyParallelShared->xid,
     705              :                                InvalidXLogRecPtr);
     706            4 :         pa_set_fileset_state(MyParallelShared, FS_EMPTY);
     707              :     }
     708              : 
     709            8 :     return true;
     710              : }
     711              : 
     712              : /*
     713              :  * Interrupt handler for main loop of parallel apply worker.
     714              :  */
     715              : static void
     716        64185 : ProcessParallelApplyInterrupts(void)
     717              : {
     718        64185 :     CHECK_FOR_INTERRUPTS();
     719              : 
     720        64181 :     if (ShutdownRequestPending)
     721              :     {
     722            5 :         ereport(LOG,
     723              :                 (errmsg("logical replication parallel apply worker for subscription \"%s\" has finished",
     724              :                         MySubscription->name)));
     725              : 
     726            5 :         proc_exit(0);
     727              :     }
     728              : 
     729        64176 :     if (ConfigReloadPending)
     730              :     {
     731            4 :         ConfigReloadPending = false;
     732            4 :         ProcessConfigFile(PGC_SIGHUP);
     733              :     }
     734        64176 : }
     735              : 
     736              : /* Parallel apply worker main loop. */
     737              : static void
     738           13 : LogicalParallelApplyLoop(shm_mq_handle *mqh)
     739              : {
     740              :     shm_mq_result shmq_res;
     741              :     ErrorContextCallback errcallback;
     742           13 :     MemoryContext oldcxt = CurrentMemoryContext;
     743              : 
     744              :     /*
     745              :      * Init the ApplyMessageContext which we clean up after each replication
     746              :      * protocol message.
     747              :      */
     748           13 :     ApplyMessageContext = AllocSetContextCreate(ApplyContext,
     749              :                                                 "ApplyMessageContext",
     750              :                                                 ALLOCSET_DEFAULT_SIZES);
     751              : 
     752              :     /*
     753              :      * Push apply error context callback. Fields will be filled while applying
     754              :      * a change.
     755              :      */
     756           13 :     errcallback.callback = apply_error_callback;
     757           13 :     errcallback.previous = error_context_stack;
     758           13 :     error_context_stack = &errcallback;
     759              : 
     760              :     for (;;)
     761        64172 :     {
     762              :         void       *data;
     763              :         Size        len;
     764              : 
     765        64185 :         ProcessParallelApplyInterrupts();
     766              : 
     767              :         /* Ensure we are reading the data into our memory context. */
     768        64176 :         MemoryContextSwitchTo(ApplyMessageContext);
     769              : 
     770        64176 :         shmq_res = shm_mq_receive(mqh, &len, &data, true);
     771              : 
     772        64176 :         if (shmq_res == SHM_MQ_SUCCESS)
     773              :         {
     774              :             StringInfoData s;
     775              :             int         c;
     776              : 
     777        64112 :             if (len == 0)
     778            0 :                 elog(ERROR, "invalid message length");
     779              : 
     780        64112 :             initReadOnlyStringInfo(&s, data, len);
     781              : 
     782              :             /*
     783              :              * The first byte of messages sent from leader apply worker to
     784              :              * parallel apply workers can only be PqReplMsg_WALData.
     785              :              */
     786        64112 :             c = pq_getmsgbyte(&s);
     787        64112 :             if (c != PqReplMsg_WALData)
     788            0 :                 elog(ERROR, "unexpected message \"%c\"", c);
     789              : 
     790              :             /*
     791              :              * Ignore statistics fields that have been updated by the leader
     792              :              * apply worker.
     793              :              *
     794              :              * XXX We can avoid sending the statistics fields from the leader
     795              :              * apply worker but for that, it needs to rebuild the entire
     796              :              * message by removing these fields which could be more work than
     797              :              * simply ignoring these fields in the parallel apply worker.
     798              :              */
     799        64112 :             s.cursor += SIZE_STATS_MESSAGE;
     800              : 
     801        64112 :             apply_dispatch(&s);
     802              :         }
     803           64 :         else if (shmq_res == SHM_MQ_WOULD_BLOCK)
     804              :         {
     805              :             /* Replay the changes from the file, if any. */
     806           64 :             if (!pa_process_spooled_messages_if_required())
     807              :             {
     808              :                 int         rc;
     809              : 
     810              :                 /* Wait for more work. */
     811           56 :                 rc = WaitLatch(MyLatch,
     812              :                                WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     813              :                                1000L,
     814              :                                WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN);
     815              : 
     816           56 :                 if (rc & WL_LATCH_SET)
     817           54 :                     ResetLatch(MyLatch);
     818              : 
     819              :                 /*
     820              :                  * Force stats reporting to avoid long delays. There can be
     821              :                  * long idle gaps before the leader assigns the next
     822              :                  * transaction, and the only opportunity to report stats
     823              :                  * during such gaps is here.
     824              :                  */
     825           56 :                 if ((rc & WL_TIMEOUT) && !IsTransactionState())
     826            2 :                     pgstat_report_stat(true);
     827              :             }
     828              :         }
     829              :         else
     830              :         {
     831              :             Assert(shmq_res == SHM_MQ_DETACHED);
     832              : 
     833            0 :             ereport(ERROR,
     834              :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     835              :                      errmsg("lost connection to the logical replication apply worker")));
     836              :         }
     837              : 
     838        64172 :         MemoryContextReset(ApplyMessageContext);
     839        64172 :         MemoryContextSwitchTo(oldcxt);
     840              :     }
     841              : 
     842              :     /* Pop the error context stack. */
     843              :     error_context_stack = errcallback.previous;
     844              : 
     845              :     MemoryContextSwitchTo(oldcxt);
     846              : }
     847              : 
     848              : /*
     849              :  * Make sure the leader apply worker tries to read from our error queue one more
     850              :  * time. This guards against the case where we exit uncleanly without sending
     851              :  * an ErrorResponse, for example because some code calls proc_exit directly.
     852              :  *
     853              :  * Also explicitly detach from dsm segment to invoke on_dsm_detach callbacks,
     854              :  * if any. See ParallelWorkerShutdown for details.
     855              :  */
     856              : static void
     857           13 : pa_shutdown(int code, Datum arg)
     858              : {
     859           13 :     SendProcSignal(MyLogicalRepWorker->leader_pid,
     860              :                    PROCSIG_PARALLEL_APPLY_MESSAGE,
     861              :                    INVALID_PROC_NUMBER);
     862              : 
     863           13 :     dsm_detach((dsm_segment *) DatumGetPointer(arg));
     864           13 : }
     865              : 
     866              : /*
     867              :  * Parallel apply worker entry point.
     868              :  */
     869              : void
     870           13 : ParallelApplyWorkerMain(Datum main_arg)
     871              : {
     872              :     ParallelApplyWorkerShared *shared;
     873              :     dsm_handle  handle;
     874              :     dsm_segment *seg;
     875              :     shm_toc    *toc;
     876              :     shm_mq     *mq;
     877              :     shm_mq_handle *mqh;
     878              :     shm_mq_handle *error_mqh;
     879              :     ReplOriginId originid;
     880           13 :     int         worker_slot = DatumGetInt32(main_arg);
     881              :     char        originname[NAMEDATALEN];
     882              : 
     883           13 :     InitializingApplyWorker = true;
     884              : 
     885              :     /*
     886              :      * Setup signal handling.
     887              :      *
     888              :      * Note: We intentionally used SIGUSR2 to trigger a graceful shutdown
     889              :      * initiated by the leader apply worker. This helps to differentiate it
     890              :      * from the case where we abort the current transaction and exit on
     891              :      * receiving SIGTERM.
     892              :      */
     893           13 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
     894           13 :     pqsignal(SIGUSR2, SignalHandlerForShutdownRequest);
     895           13 :     BackgroundWorkerUnblockSignals();
     896              : 
     897              :     /*
     898              :      * Attach to the dynamic shared memory segment for the parallel apply, and
     899              :      * find its table of contents.
     900              :      *
     901              :      * Like parallel query, we don't need resource owner by this time. See
     902              :      * ParallelWorkerMain.
     903              :      */
     904           13 :     memcpy(&handle, MyBgworkerEntry->bgw_extra, sizeof(dsm_handle));
     905           13 :     seg = dsm_attach(handle);
     906           13 :     if (!seg)
     907            0 :         ereport(ERROR,
     908              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     909              :                  errmsg("could not map dynamic shared memory segment")));
     910              : 
     911           13 :     toc = shm_toc_attach(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg));
     912           13 :     if (!toc)
     913            0 :         ereport(ERROR,
     914              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     915              :                  errmsg("invalid magic number in dynamic shared memory segment")));
     916              : 
     917              :     /* Look up the shared information. */
     918           13 :     shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false);
     919           13 :     MyParallelShared = shared;
     920              : 
     921              :     /*
     922              :      * Attach to the message queue.
     923              :      */
     924           13 :     mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_MQ, false);
     925           13 :     shm_mq_set_receiver(mq, MyProc);
     926           13 :     mqh = shm_mq_attach(mq, seg, NULL);
     927              : 
     928              :     /*
     929              :      * Primary initialization is complete. Now, we can attach to our slot.
     930              :      * This is to ensure that the leader apply worker does not write data to
     931              :      * the uninitialized memory queue.
     932              :      */
     933           13 :     logicalrep_worker_attach(worker_slot);
     934              : 
     935              :     /*
     936              :      * Register the shutdown callback after we are attached to the worker
     937              :      * slot. This is to ensure that MyLogicalRepWorker remains valid when this
     938              :      * callback is invoked.
     939              :      */
     940           13 :     before_shmem_exit(pa_shutdown, PointerGetDatum(seg));
     941              : 
     942           13 :     SpinLockAcquire(&MyParallelShared->mutex);
     943           13 :     MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation;
     944           13 :     MyParallelShared->logicalrep_worker_slot_no = worker_slot;
     945           13 :     SpinLockRelease(&MyParallelShared->mutex);
     946              : 
     947              :     /*
     948              :      * Attach to the error queue.
     949              :      */
     950           13 :     mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, false);
     951           13 :     shm_mq_set_sender(mq, MyProc);
     952           13 :     error_mqh = shm_mq_attach(mq, seg, NULL);
     953              : 
     954           13 :     pq_redirect_to_shm_mq(seg, error_mqh);
     955           13 :     pq_set_parallel_leader(MyLogicalRepWorker->leader_pid,
     956              :                            INVALID_PROC_NUMBER);
     957              : 
     958           13 :     MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
     959           13 :         MyLogicalRepWorker->reply_time = 0;
     960              : 
     961           13 :     InitializeLogRepWorker();
     962              : 
     963           13 :     InitializingApplyWorker = false;
     964              : 
     965              :     /* Setup replication origin tracking. */
     966           13 :     StartTransactionCommand();
     967           13 :     ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
     968              :                                        originname, sizeof(originname));
     969           13 :     originid = replorigin_by_name(originname, false);
     970              : 
     971              :     /*
     972              :      * The parallel apply worker doesn't need to monopolize this replication
     973              :      * origin which was already acquired by its leader process.
     974              :      */
     975           13 :     replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
     976           13 :     replorigin_xact_state.origin = originid;
     977           13 :     CommitTransactionCommand();
     978              : 
     979              :     /*
     980              :      * Setup callback for syscache so that we know when something changes in
     981              :      * the subscription relation state.
     982              :      */
     983           13 :     CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
     984              :                                   InvalidateSyncingRelStates,
     985              :                                   (Datum) 0);
     986              : 
     987           13 :     set_apply_error_context_origin(originname);
     988              : 
     989           13 :     LogicalParallelApplyLoop(mqh);
     990              : 
     991              :     /*
     992              :      * The parallel apply worker must not get here because the parallel apply
     993              :      * worker will only stop when it receives a SIGTERM or SIGUSR2 from the
     994              :      * leader, or SIGINT from itself, or when there is an error. None of these
     995              :      * cases will allow the code to reach here.
     996              :      */
     997              :     Assert(false);
     998            0 : }
     999              : 
    1000              : /*
    1001              :  * Handle receipt of an interrupt indicating a parallel apply worker message.
    1002              :  *
    1003              :  * Note: this is called within a signal handler! All we can do is set a flag
    1004              :  * that will cause the next CHECK_FOR_INTERRUPTS() to invoke
    1005              :  * ProcessParallelApplyMessages().
    1006              :  */
    1007              : void
    1008           17 : HandleParallelApplyMessageInterrupt(void)
    1009              : {
    1010           17 :     InterruptPending = true;
    1011           17 :     ParallelApplyMessagePending = true;
    1012              :     /* latch will be set by procsignal_sigusr1_handler */
    1013           17 : }
    1014              : 
    1015              : /*
    1016              :  * Process a single protocol message received from a single parallel apply
    1017              :  * worker.
    1018              :  */
    1019              : static void
    1020            3 : ProcessParallelApplyMessage(StringInfo msg)
    1021              : {
    1022              :     char        msgtype;
    1023              : 
    1024            3 :     msgtype = pq_getmsgbyte(msg);
    1025              : 
    1026            3 :     switch (msgtype)
    1027              :     {
    1028            3 :         case PqMsg_ErrorResponse:
    1029              :             {
    1030              :                 ErrorData   edata;
    1031              : 
    1032              :                 /* Parse ErrorResponse. */
    1033            3 :                 pq_parse_errornotice(msg, &edata);
    1034              : 
    1035              :                 /*
    1036              :                  * If desired, add a context line to show that this is a
    1037              :                  * message propagated from a parallel apply worker. Otherwise,
    1038              :                  * it can sometimes be confusing to understand what actually
    1039              :                  * happened.
    1040              :                  */
    1041            3 :                 if (edata.context)
    1042            3 :                     edata.context = psprintf("%s\n%s", edata.context,
    1043              :                                              _("logical replication parallel apply worker"));
    1044              :                 else
    1045            0 :                     edata.context = pstrdup(_("logical replication parallel apply worker"));
    1046              : 
    1047              :                 /*
    1048              :                  * Context beyond that should use the error context callbacks
    1049              :                  * that were in effect in LogicalRepApplyLoop().
    1050              :                  */
    1051            3 :                 error_context_stack = apply_error_context_stack;
    1052              : 
    1053              :                 /*
    1054              :                  * The actual error must have been reported by the parallel
    1055              :                  * apply worker.
    1056              :                  */
    1057            3 :                 ereport(ERROR,
    1058              :                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1059              :                          errmsg("logical replication parallel apply worker exited due to error"),
    1060              :                          errcontext("%s", edata.context)));
    1061              :             }
    1062              : 
    1063              :             /*
    1064              :              * Don't need to do anything about NoticeResponse and
    1065              :              * NotificationResponse as the logical replication worker doesn't
    1066              :              * need to send messages to the client.
    1067              :              */
    1068            0 :         case PqMsg_NoticeResponse:
    1069              :         case PqMsg_NotificationResponse:
    1070            0 :             break;
    1071              : 
    1072            0 :         default:
    1073            0 :             elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",
    1074              :                  msgtype, msg->len);
    1075              :     }
    1076            0 : }
    1077              : 
    1078              : /*
    1079              :  * Handle any queued protocol messages received from parallel apply workers.
    1080              :  */
    1081              : void
    1082            8 : ProcessParallelApplyMessages(void)
    1083              : {
    1084              :     ListCell   *lc;
    1085              :     MemoryContext oldcontext;
    1086              : 
    1087              :     static MemoryContext hpam_context = NULL;
    1088              : 
    1089              :     /*
    1090              :      * This is invoked from ProcessInterrupts(), and since some of the
    1091              :      * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
    1092              :      * for recursive calls if more signals are received while this runs. It's
    1093              :      * unclear that recursive entry would be safe, and it doesn't seem useful
    1094              :      * even if it is safe, so let's block interrupts until done.
    1095              :      */
    1096            8 :     HOLD_INTERRUPTS();
    1097              : 
    1098              :     /*
    1099              :      * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
    1100              :      * don't want to risk leaking data into long-lived contexts, so let's do
    1101              :      * our work here in a private context that we can reset on each use.
    1102              :      */
    1103            8 :     if (!hpam_context)          /* first time through? */
    1104            7 :         hpam_context = AllocSetContextCreate(TopMemoryContext,
    1105              :                                              "ProcessParallelApplyMessages",
    1106              :                                              ALLOCSET_DEFAULT_SIZES);
    1107              :     else
    1108            1 :         MemoryContextReset(hpam_context);
    1109              : 
    1110            8 :     oldcontext = MemoryContextSwitchTo(hpam_context);
    1111              : 
    1112            8 :     ParallelApplyMessagePending = false;
    1113              : 
    1114           14 :     foreach(lc, ParallelApplyWorkerPool)
    1115              :     {
    1116              :         shm_mq_result res;
    1117              :         Size        nbytes;
    1118              :         void       *data;
    1119            9 :         ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
    1120              : 
    1121              :         /*
    1122              :          * The leader will detach from the error queue and set it to NULL
    1123              :          * before preparing to stop all parallel apply workers, so we don't
    1124              :          * need to handle error messages anymore. See
    1125              :          * logicalrep_worker_detach.
    1126              :          */
    1127            9 :         if (!winfo->error_mq_handle)
    1128            6 :             continue;
    1129              : 
    1130            4 :         res = shm_mq_receive(winfo->error_mq_handle, &nbytes, &data, true);
    1131              : 
    1132            4 :         if (res == SHM_MQ_WOULD_BLOCK)
    1133            1 :             continue;
    1134            3 :         else if (res == SHM_MQ_SUCCESS)
    1135              :         {
    1136              :             StringInfoData msg;
    1137              : 
    1138            3 :             initStringInfo(&msg);
    1139            3 :             appendBinaryStringInfo(&msg, data, nbytes);
    1140            3 :             ProcessParallelApplyMessage(&msg);
    1141            0 :             pfree(msg.data);
    1142              :         }
    1143              :         else
    1144            0 :             ereport(ERROR,
    1145              :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1146              :                      errmsg("lost connection to the logical replication parallel apply worker")));
    1147              :     }
    1148              : 
    1149            5 :     MemoryContextSwitchTo(oldcontext);
    1150              : 
    1151              :     /* Might as well clear the context on our way out */
    1152            5 :     MemoryContextReset(hpam_context);
    1153              : 
    1154            5 :     RESUME_INTERRUPTS();
    1155            5 : }
    1156              : 
    1157              : /*
    1158              :  * Send the data to the specified parallel apply worker via shared-memory
    1159              :  * queue.
    1160              :  *
    1161              :  * Returns false if the attempt to send data via shared memory times out, true
    1162              :  * otherwise.
    1163              :  */
    1164              : bool
    1165        68911 : pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
    1166              : {
    1167              :     int         rc;
    1168              :     shm_mq_result result;
    1169        68911 :     TimestampTz startTime = 0;
    1170              : 
    1171              :     Assert(!IsTransactionState());
    1172              :     Assert(!winfo->serialize_changes);
    1173              : 
    1174              :     /*
    1175              :      * We don't try to send data to parallel worker for 'immediate' mode. This
    1176              :      * is primarily used for testing purposes.
    1177              :      */
    1178        68911 :     if (unlikely(debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE))
    1179            4 :         return false;
    1180              : 
    1181              : /*
    1182              :  * This timeout is a bit arbitrary but testing revealed that it is sufficient
    1183              :  * to send the message unless the parallel apply worker is waiting on some
    1184              :  * lock or there is a serious resource crunch. See the comments atop this file
    1185              :  * to know why we are using a non-blocking way to send the message.
    1186              :  */
    1187              : #define SHM_SEND_RETRY_INTERVAL_MS 1000
    1188              : #define SHM_SEND_TIMEOUT_MS     (10000 - SHM_SEND_RETRY_INTERVAL_MS)
    1189              : 
    1190              :     for (;;)
    1191              :     {
    1192        68907 :         result = shm_mq_send(winfo->mq_handle, nbytes, data, true, true);
    1193              : 
    1194        68907 :         if (result == SHM_MQ_SUCCESS)
    1195        68907 :             return true;
    1196            0 :         else if (result == SHM_MQ_DETACHED)
    1197            0 :             ereport(ERROR,
    1198              :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1199              :                      errmsg("could not send data to shared-memory queue")));
    1200              : 
    1201              :         Assert(result == SHM_MQ_WOULD_BLOCK);
    1202              : 
    1203              :         /* Wait before retrying. */
    1204            0 :         rc = WaitLatch(MyLatch,
    1205              :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1206              :                        SHM_SEND_RETRY_INTERVAL_MS,
    1207              :                        WAIT_EVENT_LOGICAL_APPLY_SEND_DATA);
    1208              : 
    1209            0 :         if (rc & WL_LATCH_SET)
    1210              :         {
    1211            0 :             ResetLatch(MyLatch);
    1212            0 :             CHECK_FOR_INTERRUPTS();
    1213              :         }
    1214              : 
    1215            0 :         if (startTime == 0)
    1216            0 :             startTime = GetCurrentTimestamp();
    1217            0 :         else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
    1218              :                                             SHM_SEND_TIMEOUT_MS))
    1219            0 :             return false;
    1220              :     }
    1221              : }
    1222              : 
    1223              : /*
    1224              :  * Switch to PARTIAL_SERIALIZE mode for the current transaction -- this means
    1225              :  * that the current data and any subsequent data for this transaction will be
    1226              :  * serialized to a file. This is done to prevent possible deadlocks with
    1227              :  * another parallel apply worker (refer to the comments atop this file).
    1228              :  */
    1229              : void
    1230            4 : pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
    1231              :                                bool stream_locked)
    1232              : {
    1233            4 :     ereport(LOG,
    1234              :             (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
    1235              :                     winfo->shared->xid)));
    1236              : 
    1237              :     /*
    1238              :      * The parallel apply worker could be stuck for some reason (say waiting
    1239              :      * on some lock by other backend), so stop trying to send data directly to
    1240              :      * it and start serializing data to the file instead.
    1241              :      */
    1242            4 :     winfo->serialize_changes = true;
    1243              : 
    1244              :     /* Initialize the stream fileset. */
    1245            4 :     stream_start_internal(winfo->shared->xid, true);
    1246              : 
    1247              :     /*
    1248              :      * Acquires the stream lock if not already to make sure that the parallel
    1249              :      * apply worker will wait for the leader to release the stream lock until
    1250              :      * the end of the transaction.
    1251              :      */
    1252            4 :     if (!stream_locked)
    1253            4 :         pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
    1254              : 
    1255            4 :     pa_set_fileset_state(winfo->shared, FS_SERIALIZE_IN_PROGRESS);
    1256            4 : }
    1257              : 
    1258              : /*
    1259              :  * Wait until the parallel apply worker's transaction state has reached or
    1260              :  * exceeded the given xact_state.
    1261              :  */
    1262              : static void
    1263           28 : pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo,
    1264              :                        ParallelTransState xact_state)
    1265              : {
    1266              :     for (;;)
    1267              :     {
    1268              :         /*
    1269              :          * Stop if the transaction state has reached or exceeded the given
    1270              :          * xact_state.
    1271              :          */
    1272          233 :         if (pa_get_xact_state(winfo->shared) >= xact_state)
    1273           28 :             break;
    1274              : 
    1275              :         /* Wait to be signalled. */
    1276          205 :         (void) WaitLatch(MyLatch,
    1277              :                          WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1278              :                          10L,
    1279              :                          WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);
    1280              : 
    1281              :         /* Reset the latch so we don't spin. */
    1282          205 :         ResetLatch(MyLatch);
    1283              : 
    1284              :         /* An interrupt may have occurred while we were waiting. */
    1285          205 :         CHECK_FOR_INTERRUPTS();
    1286              :     }
    1287           28 : }
    1288              : 
    1289              : /*
    1290              :  * Wait until the parallel apply worker's transaction finishes.
    1291              :  */
    1292              : static void
    1293           28 : pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
    1294              : {
    1295              :     /*
    1296              :      * Wait until the parallel apply worker set the state to
    1297              :      * PARALLEL_TRANS_STARTED which means it has acquired the transaction
    1298              :      * lock. This is to prevent leader apply worker from acquiring the
    1299              :      * transaction lock earlier than the parallel apply worker.
    1300              :      */
    1301           28 :     pa_wait_for_xact_state(winfo, PARALLEL_TRANS_STARTED);
    1302              : 
    1303              :     /*
    1304              :      * Wait for the transaction lock to be released. This is required to
    1305              :      * detect deadlock among leader and parallel apply workers. Refer to the
    1306              :      * comments atop this file.
    1307              :      */
    1308           28 :     pa_lock_transaction(winfo->shared->xid, AccessShareLock);
    1309           25 :     pa_unlock_transaction(winfo->shared->xid, AccessShareLock);
    1310              : 
    1311              :     /*
    1312              :      * Check if the state becomes PARALLEL_TRANS_FINISHED in case the parallel
    1313              :      * apply worker failed while applying changes causing the lock to be
    1314              :      * released.
    1315              :      */
    1316           25 :     if (pa_get_xact_state(winfo->shared) != PARALLEL_TRANS_FINISHED)
    1317            0 :         ereport(ERROR,
    1318              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1319              :                  errmsg("lost connection to the logical replication parallel apply worker")));
    1320           25 : }
    1321              : 
    1322              : /*
    1323              :  * Set the transaction state for a given parallel apply worker.
    1324              :  */
    1325              : void
    1326           55 : pa_set_xact_state(ParallelApplyWorkerShared *wshared,
    1327              :                   ParallelTransState xact_state)
    1328              : {
    1329           55 :     SpinLockAcquire(&wshared->mutex);
    1330           55 :     wshared->xact_state = xact_state;
    1331           55 :     SpinLockRelease(&wshared->mutex);
    1332           55 : }
    1333              : 
    1334              : /*
    1335              :  * Get the transaction state for a given parallel apply worker.
    1336              :  */
    1337              : static ParallelTransState
    1338          258 : pa_get_xact_state(ParallelApplyWorkerShared *wshared)
    1339              : {
    1340              :     ParallelTransState xact_state;
    1341              : 
    1342          258 :     SpinLockAcquire(&wshared->mutex);
    1343          258 :     xact_state = wshared->xact_state;
    1344          258 :     SpinLockRelease(&wshared->mutex);
    1345              : 
    1346          258 :     return xact_state;
    1347              : }
    1348              : 
    1349              : /*
    1350              :  * Cache the parallel apply worker information.
    1351              :  */
    1352              : void
    1353          512 : pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
    1354              : {
    1355          512 :     stream_apply_worker = winfo;
    1356          512 : }
    1357              : 
    1358              : /*
    1359              :  * Form a unique savepoint name for the streaming transaction.
    1360              :  *
    1361              :  * Note that different subscriptions for publications on different nodes can
    1362              :  * receive same remote xid, so we need to use subscription id along with it.
    1363              :  *
    1364              :  * Returns the name in the supplied buffer.
    1365              :  */
    1366              : static void
    1367           27 : pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
    1368              : {
    1369           27 :     snprintf(spname, szsp, "pg_sp_%u_%u", suboid, xid);
    1370           27 : }
    1371              : 
    1372              : /*
    1373              :  * Define a savepoint for a subxact in parallel apply worker if needed.
    1374              :  *
    1375              :  * The parallel apply worker can figure out if a new subtransaction was
    1376              :  * started by checking if the new change arrived with a different xid. In that
    1377              :  * case define a named savepoint, so that we are able to rollback to it
    1378              :  * if required.
    1379              :  */
    1380              : void
    1381        68625 : pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
    1382              : {
    1383        68625 :     if (current_xid != top_xid &&
    1384           52 :         !list_member_xid(subxactlist, current_xid))
    1385              :     {
    1386              :         MemoryContext oldctx;
    1387              :         char        spname[NAMEDATALEN];
    1388              : 
    1389           17 :         pa_savepoint_name(MySubscription->oid, current_xid,
    1390              :                           spname, sizeof(spname));
    1391              : 
    1392           17 :         elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);
    1393              : 
    1394              :         /* We must be in transaction block to define the SAVEPOINT. */
    1395           17 :         if (!IsTransactionBlock())
    1396              :         {
    1397            5 :             if (!IsTransactionState())
    1398            0 :                 StartTransactionCommand();
    1399              : 
    1400            5 :             BeginTransactionBlock();
    1401            5 :             CommitTransactionCommand();
    1402              :         }
    1403              : 
    1404           17 :         DefineSavepoint(spname);
    1405              : 
    1406              :         /*
    1407              :          * CommitTransactionCommand is needed to start a subtransaction after
    1408              :          * issuing a SAVEPOINT inside a transaction block (see
    1409              :          * StartSubTransaction()).
    1410              :          */
    1411           17 :         CommitTransactionCommand();
    1412              : 
    1413           17 :         oldctx = MemoryContextSwitchTo(TopTransactionContext);
    1414           17 :         subxactlist = lappend_xid(subxactlist, current_xid);
    1415           17 :         MemoryContextSwitchTo(oldctx);
    1416              :     }
    1417        68625 : }
    1418              : 
    1419              : /* Reset the list that maintains subtransactions. */
    1420              : void
    1421           25 : pa_reset_subtrans(void)
    1422              : {
    1423              :     /*
    1424              :      * We don't need to free this explicitly as the allocated memory will be
    1425              :      * freed at the transaction end.
    1426              :      */
    1427           25 :     subxactlist = NIL;
    1428           25 : }
    1429              : 
    1430              : /*
    1431              :  * Handle STREAM ABORT message when the transaction was applied in a parallel
    1432              :  * apply worker.
    1433              :  */
    1434              : void
    1435           12 : pa_stream_abort(LogicalRepStreamAbortData *abort_data)
    1436              : {
    1437           12 :     TransactionId xid = abort_data->xid;
    1438           12 :     TransactionId subxid = abort_data->subxid;
    1439              : 
    1440              :     /*
    1441              :      * Update origin state so we can restart streaming from correct position
    1442              :      * in case of crash.
    1443              :      */
    1444           12 :     replorigin_xact_state.origin_lsn = abort_data->abort_lsn;
    1445           12 :     replorigin_xact_state.origin_timestamp = abort_data->abort_time;
    1446              : 
    1447              :     /*
    1448              :      * If the two XIDs are the same, it's in fact abort of toplevel xact, so
    1449              :      * just free the subxactlist.
    1450              :      */
    1451           12 :     if (subxid == xid)
    1452              :     {
    1453            2 :         pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
    1454              : 
    1455              :         /*
    1456              :          * Release the lock as we might be processing an empty streaming
    1457              :          * transaction in which case the lock won't be released during
    1458              :          * transaction rollback.
    1459              :          *
    1460              :          * Note that it's ok to release the transaction lock before aborting
    1461              :          * the transaction because even if the parallel apply worker dies due
    1462              :          * to crash or some other reason, such a transaction would still be
    1463              :          * considered aborted.
    1464              :          */
    1465            2 :         pa_unlock_transaction(xid, AccessExclusiveLock);
    1466              : 
    1467            2 :         AbortCurrentTransaction();
    1468              : 
    1469            2 :         if (IsTransactionBlock())
    1470              :         {
    1471            1 :             EndTransactionBlock(false);
    1472            1 :             CommitTransactionCommand();
    1473              :         }
    1474              : 
    1475            2 :         pa_reset_subtrans();
    1476              : 
    1477            2 :         pgstat_report_activity(STATE_IDLE, NULL);
    1478              :     }
    1479              :     else
    1480              :     {
    1481              :         /* OK, so it's a subxact. Rollback to the savepoint. */
    1482              :         int         i;
    1483              :         char        spname[NAMEDATALEN];
    1484              : 
    1485           10 :         pa_savepoint_name(MySubscription->oid, subxid, spname, sizeof(spname));
    1486              : 
    1487           10 :         elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);
    1488              : 
    1489              :         /*
    1490              :          * Search the subxactlist, determine the offset tracked for the
    1491              :          * subxact, and truncate the list.
    1492              :          *
    1493              :          * Note that for an empty sub-transaction we won't find the subxid
    1494              :          * here.
    1495              :          */
    1496           12 :         for (i = list_length(subxactlist) - 1; i >= 0; i--)
    1497              :         {
    1498           11 :             TransactionId xid_tmp = lfirst_xid(list_nth_cell(subxactlist, i));
    1499              : 
    1500           11 :             if (xid_tmp == subxid)
    1501              :             {
    1502            9 :                 RollbackToSavepoint(spname);
    1503            9 :                 CommitTransactionCommand();
    1504            9 :                 subxactlist = list_truncate(subxactlist, i);
    1505            9 :                 break;
    1506              :             }
    1507              :         }
    1508              :     }
    1509           12 : }
    1510              : 
    1511              : /*
    1512              :  * Set the fileset state for a particular parallel apply worker. The fileset
    1513              :  * will be set once the leader worker serialized all changes to the file
    1514              :  * so that it can be used by parallel apply worker.
    1515              :  */
    1516              : void
    1517           16 : pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
    1518              :                      PartialFileSetState fileset_state)
    1519              : {
    1520           16 :     SpinLockAcquire(&wshared->mutex);
    1521           16 :     wshared->fileset_state = fileset_state;
    1522              : 
    1523           16 :     if (fileset_state == FS_SERIALIZE_DONE)
    1524              :     {
    1525              :         Assert(am_leader_apply_worker());
    1526              :         Assert(MyLogicalRepWorker->stream_fileset);
    1527            4 :         wshared->fileset = *MyLogicalRepWorker->stream_fileset;
    1528              :     }
    1529              : 
    1530           16 :     SpinLockRelease(&wshared->mutex);
    1531           16 : }
    1532              : 
    1533              : /*
    1534              :  * Get the fileset state for the current parallel apply worker.
    1535              :  */
    1536              : static PartialFileSetState
    1537           80 : pa_get_fileset_state(void)
    1538              : {
    1539              :     PartialFileSetState fileset_state;
    1540              : 
    1541              :     Assert(am_parallel_apply_worker());
    1542              : 
    1543           80 :     SpinLockAcquire(&MyParallelShared->mutex);
    1544           80 :     fileset_state = MyParallelShared->fileset_state;
    1545           80 :     SpinLockRelease(&MyParallelShared->mutex);
    1546              : 
    1547           80 :     return fileset_state;
    1548              : }
    1549              : 
    1550              : /*
    1551              :  * Helper functions to acquire and release a lock for each stream block.
    1552              :  *
    1553              :  * Set locktag_field4 to PARALLEL_APPLY_LOCK_STREAM to indicate that it's a
    1554              :  * stream lock.
    1555              :  *
    1556              :  * Refer to the comments atop this file to see how the stream lock is used.
    1557              :  */
    1558              : void
    1559          282 : pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
    1560              : {
    1561          282 :     LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
    1562              :                                    PARALLEL_APPLY_LOCK_STREAM, lockmode);
    1563          280 : }
    1564              : 
    1565              : void
    1566          278 : pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
    1567              : {
    1568          278 :     UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
    1569              :                                      PARALLEL_APPLY_LOCK_STREAM, lockmode);
    1570          278 : }
    1571              : 
    1572              : /*
    1573              :  * Helper functions to acquire and release a lock for each local transaction
    1574              :  * apply.
    1575              :  *
    1576              :  * Set locktag_field4 to PARALLEL_APPLY_LOCK_XACT to indicate that it's a
    1577              :  * transaction lock.
    1578              :  *
    1579              :  * Note that all the callers must pass a remote transaction ID instead of a
    1580              :  * local transaction ID as xid. This is because the local transaction ID will
    1581              :  * only be assigned while applying the first change in the parallel apply but
    1582              :  * it's possible that the first change in the parallel apply worker is blocked
    1583              :  * by a concurrently executing transaction in another parallel apply worker. We
    1584              :  * can only communicate the local transaction id to the leader after applying
    1585              :  * the first change so it won't be able to wait after sending the xact finish
    1586              :  * command using this lock.
    1587              :  *
    1588              :  * Refer to the comments atop this file to see how the transaction lock is
    1589              :  * used.
    1590              :  */
    1591              : void
    1592           58 : pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
    1593              : {
    1594           58 :     LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
    1595              :                                    PARALLEL_APPLY_LOCK_XACT, lockmode);
    1596           55 : }
    1597              : 
    1598              : void
    1599           50 : pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
    1600              : {
    1601           50 :     UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
    1602              :                                      PARALLEL_APPLY_LOCK_XACT, lockmode);
    1603           50 : }
    1604              : 
    1605              : /*
    1606              :  * Decrement the number of pending streaming blocks and wait on the stream lock
    1607              :  * if there is no pending block available.
    1608              :  */
    1609              : void
    1610          255 : pa_decr_and_wait_stream_block(void)
    1611              : {
    1612              :     Assert(am_parallel_apply_worker());
    1613              : 
    1614              :     /*
    1615              :      * It is only possible to not have any pending stream chunks when we are
    1616              :      * applying spooled messages.
    1617              :      */
    1618          255 :     if (pg_atomic_read_u32(&MyParallelShared->pending_stream_count) == 0)
    1619              :     {
    1620           16 :         if (pa_has_spooled_message_pending())
    1621           16 :             return;
    1622              : 
    1623            0 :         elog(ERROR, "invalid pending streaming chunk 0");
    1624              :     }
    1625              : 
    1626          239 :     if (pg_atomic_sub_fetch_u32(&MyParallelShared->pending_stream_count, 1) == 0)
    1627              :     {
    1628           28 :         pa_lock_stream(MyParallelShared->xid, AccessShareLock);
    1629           26 :         pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
    1630              :     }
    1631              : }
    1632              : 
    1633              : /*
    1634              :  * Finish processing the streaming transaction in the leader apply worker.
    1635              :  */
    1636              : void
    1637           28 : pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
    1638              : {
    1639              :     Assert(am_leader_apply_worker());
    1640              : 
    1641              :     /*
    1642              :      * Unlock the shared object lock so that parallel apply worker can
    1643              :      * continue to receive and apply changes.
    1644              :      */
    1645           28 :     pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
    1646              : 
    1647              :     /*
    1648              :      * Wait for that worker to finish. This is necessary to maintain commit
    1649              :      * order which avoids failures due to transaction dependencies and
    1650              :      * deadlocks.
    1651              :      */
    1652           28 :     pa_wait_for_xact_finish(winfo);
    1653              : 
    1654           25 :     if (XLogRecPtrIsValid(remote_lsn))
    1655           23 :         store_flush_position(remote_lsn, winfo->shared->last_commit_end);
    1656              : 
    1657           25 :     pa_free_worker(winfo);
    1658           25 : }
        

Generated by: LCOV version 2.0-1