LCOV - code coverage report
Current view: top level - src/backend/replication/logical - applyparallelworker.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 89.5 % 410 367
Test Date: 2026-03-14 14:14:39 Functions: 100.0 % 36 36
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  * applyparallelworker.c
       3              :  *     Support routines for applying xact by parallel apply worker
       4              :  *
       5              :  * Copyright (c) 2023-2026, PostgreSQL Global Development Group
       6              :  *
       7              :  * IDENTIFICATION
       8              :  *    src/backend/replication/logical/applyparallelworker.c
       9              :  *
      10              :  * This file contains the code to launch, set up, and teardown a parallel apply
      11              :  * worker which receives the changes from the leader worker and invokes routines
      12              :  * to apply those on the subscriber database. Additionally, this file contains
      13              :  * routines that are intended to support setting up, using, and tearing down a
      14              :  * ParallelApplyWorkerInfo which is required so the leader worker and parallel
      15              :  * apply workers can communicate with each other.
      16              :  *
      17              :  * The parallel apply workers are assigned (if available) as soon as xact's
      18              :  * first stream is received for subscriptions that have set their 'streaming'
      19              :  * option as parallel. The leader apply worker will send changes to this new
      20              :  * worker via shared memory. We keep this worker assigned till the transaction
      21              :  * commit is received and also wait for the worker to finish at commit. This
      22              :  * preserves commit ordering and avoid file I/O in most cases, although we
      23              :  * still need to spill to a file if there is no worker available. See comments
      24              :  * atop logical/worker to know more about streamed xacts whose changes are
      25              :  * spilled to disk. It is important to maintain commit order to avoid failures
      26              :  * due to: (a) transaction dependencies - say if we insert a row in the first
      27              :  * transaction and update it in the second transaction on publisher then
      28              :  * allowing the subscriber to apply both in parallel can lead to failure in the
      29              :  * update; (b) deadlocks - allowing transactions that update the same set of
      30              :  * rows/tables in the opposite order to be applied in parallel can lead to
      31              :  * deadlocks.
      32              :  *
      33              :  * A worker pool is used to avoid restarting workers for each streaming
      34              :  * transaction. We maintain each worker's information (ParallelApplyWorkerInfo)
      35              :  * in the ParallelApplyWorkerPool. After successfully launching a new worker,
      36              :  * its information is added to the ParallelApplyWorkerPool. Once the worker
      37              :  * finishes applying the transaction, it is marked as available for re-use.
      38              :  * Now, before starting a new worker to apply the streaming transaction, we
      39              :  * check the list for any available worker. Note that we retain a maximum of
      40              :  * half the max_parallel_apply_workers_per_subscription workers in the pool and
      41              :  * after that, we simply exit the worker after applying the transaction.
      42              :  *
      43              :  * XXX This worker pool threshold is arbitrary and we can provide a GUC
      44              :  * variable for this in the future if required.
      45              :  *
      46              :  * The leader apply worker will create a separate dynamic shared memory segment
      47              :  * when each parallel apply worker starts. The reason for this design is that
      48              :  * we cannot predict how many workers will be needed. It may be possible to
      49              :  * allocate enough shared memory in one segment based on the maximum number of
      50              :  * parallel apply workers (max_parallel_apply_workers_per_subscription), but
      51              :  * this would waste memory if no process is actually started.
      52              :  *
      53              :  * The dynamic shared memory segment contains: (a) a shm_mq that is used to
      54              :  * send changes in the transaction from leader apply worker to parallel apply
      55              :  * worker; (b) another shm_mq that is used to send errors (and other messages
      56              :  * reported via elog/ereport) from the parallel apply worker to leader apply
      57              :  * worker; (c) necessary information to be shared among parallel apply workers
      58              :  * and the leader apply worker (i.e. members of ParallelApplyWorkerShared).
      59              :  *
      60              :  * Locking Considerations
      61              :  * ----------------------
      62              :  * We have a risk of deadlock due to concurrently applying the transactions in
      63              :  * parallel mode that were independent on the publisher side but became
      64              :  * dependent on the subscriber side due to the different database structures
      65              :  * (like schema of subscription tables, constraints, etc.) on each side. This
      66              :  * can happen even without parallel mode when there are concurrent operations
      67              :  * on the subscriber. In order to detect the deadlocks among leader (LA) and
      68              :  * parallel apply (PA) workers, we used lmgr locks when the PA waits for the
      69              :  * next stream (set of changes) and LA waits for PA to finish the transaction.
      70              :  * An alternative approach could be to not allow parallelism when the schema of
      71              :  * tables is different between the publisher and subscriber but that would be
      72              :  * too restrictive and would require the publisher to send much more
      73              :  * information than it is currently sending.
      74              :  *
      75              :  * Consider a case where the subscribed table does not have a unique key on the
      76              :  * publisher and has a unique key on the subscriber. The deadlock can happen in
      77              :  * the following ways:
      78              :  *
      79              :  * 1) Deadlock between the leader apply worker and a parallel apply worker
      80              :  *
      81              :  * Consider that the parallel apply worker (PA) is executing TX-1 and the
      82              :  * leader apply worker (LA) is executing TX-2 concurrently on the subscriber.
      83              :  * Now, LA is waiting for PA because of the unique key constraint of the
      84              :  * subscribed table while PA is waiting for LA to send the next stream of
      85              :  * changes or transaction finish command message.
      86              :  *
      87              :  * In order for lmgr to detect this, we have LA acquire a session lock on the
      88              :  * remote transaction (by pa_lock_stream()) and have PA wait on the lock before
      89              :  * trying to receive the next stream of changes. Specifically, LA will acquire
      90              :  * the lock in AccessExclusive mode before sending the STREAM_STOP and will
      91              :  * release it if already acquired after sending the STREAM_START, STREAM_ABORT
      92              :  * (for toplevel transaction), STREAM_PREPARE, and STREAM_COMMIT. The PA will
      93              :  * acquire the lock in AccessShare mode after processing STREAM_STOP and
      94              :  * STREAM_ABORT (for subtransaction) and then release the lock immediately
      95              :  * after acquiring it.
      96              :  *
      97              :  * The lock graph for the above example will look as follows:
      98              :  * LA (waiting to acquire the lock on the unique index) -> PA (waiting to
      99              :  * acquire the stream lock) -> LA
     100              :  *
     101              :  * This way, when PA is waiting for LA for the next stream of changes, we can
     102              :  * have a wait-edge from PA to LA in lmgr, which will make us detect the
     103              :  * deadlock between LA and PA.
     104              :  *
     105              :  * 2) Deadlock between the leader apply worker and parallel apply workers
     106              :  *
     107              :  * This scenario is similar to the first case but TX-1 and TX-2 are executed by
     108              :  * two parallel apply workers (PA-1 and PA-2 respectively). In this scenario,
     109              :  * PA-2 is waiting for PA-1 to complete its transaction while PA-1 is waiting
     110              :  * for subsequent input from LA. Also, LA is waiting for PA-2 to complete its
     111              :  * transaction in order to preserve the commit order. There is a deadlock among
     112              :  * the three processes.
     113              :  *
     114              :  * In order for lmgr to detect this, we have PA acquire a session lock (this is
     115              :  * a different lock than referred in the previous case, see
     116              :  * pa_lock_transaction()) on the transaction being applied and have LA wait on
     117              :  * the lock before proceeding in the transaction finish commands. Specifically,
     118              :  * PA will acquire this lock in AccessExclusive mode before executing the first
     119              :  * message of the transaction and release it at the xact end. LA will acquire
     120              :  * this lock in AccessShare mode at transaction finish commands and release it
     121              :  * immediately.
     122              :  *
     123              :  * The lock graph for the above example will look as follows:
     124              :  * LA (waiting to acquire the transaction lock) -> PA-2 (waiting to acquire the
     125              :  * lock due to unique index constraint) -> PA-1 (waiting to acquire the stream
     126              :  * lock) -> LA
     127              :  *
     128              :  * This way when LA is waiting to finish the transaction end command to preserve
     129              :  * the commit order, we will be able to detect deadlock, if any.
     130              :  *
     131              :  * One might think we can use XactLockTableWait(), but XactLockTableWait()
     132              :  * considers PREPARED TRANSACTION as still in progress which means the lock
     133              :  * won't be released even after the parallel apply worker has prepared the
     134              :  * transaction.
     135              :  *
     136              :  * 3) Deadlock when the shm_mq buffer is full
     137              :  *
     138              :  * In the previous scenario (ie. PA-1 and PA-2 are executing transactions
     139              :  * concurrently), if the shm_mq buffer between LA and PA-2 is full, LA has to
     140              :  * wait to send messages, and this wait doesn't appear in lmgr.
     141              :  *
     142              :  * To avoid this wait, we use a non-blocking write and wait with a timeout. If
     143              :  * the timeout is exceeded, the LA will serialize all the pending messages to
     144              :  * a file and indicate PA-2 that it needs to read that file for the remaining
     145              :  * messages. Then LA will start waiting for commit as in the previous case
     146              :  * which will detect deadlock if any. See pa_send_data() and
     147              :  * enum TransApplyAction.
     148              :  *
     149              :  * Lock types
     150              :  * ----------
     151              :  * Both the stream lock and the transaction lock mentioned above are
     152              :  * session-level locks because both locks could be acquired outside the
     153              :  * transaction, and the stream lock in the leader needs to persist across
     154              :  * transaction boundaries i.e. until the end of the streaming transaction.
     155              :  *-------------------------------------------------------------------------
     156              :  */
     157              : 
     158              : #include "postgres.h"
     159              : 
     160              : #include "libpq/pqformat.h"
     161              : #include "libpq/pqmq.h"
     162              : #include "pgstat.h"
     163              : #include "postmaster/interrupt.h"
     164              : #include "replication/logicallauncher.h"
     165              : #include "replication/logicalworker.h"
     166              : #include "replication/origin.h"
     167              : #include "replication/worker_internal.h"
     168              : #include "storage/ipc.h"
     169              : #include "storage/latch.h"
     170              : #include "storage/lmgr.h"
     171              : #include "storage/proc.h"
     172              : #include "tcop/tcopprot.h"
     173              : #include "utils/inval.h"
     174              : #include "utils/memutils.h"
     175              : #include "utils/syscache.h"
     176              : #include "utils/wait_event.h"
     177              : 
     178              : #define PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067
     179              : 
     180              : /*
     181              :  * DSM keys for parallel apply worker. Unlike other parallel execution code,
     182              :  * since we don't need to worry about DSM keys conflicting with plan_node_id we
     183              :  * can use small integers.
     184              :  */
     185              : #define PARALLEL_APPLY_KEY_SHARED       1
     186              : #define PARALLEL_APPLY_KEY_MQ           2
     187              : #define PARALLEL_APPLY_KEY_ERROR_QUEUE  3
     188              : 
     189              : /* Queue size of DSM, 16 MB for now. */
     190              : #define DSM_QUEUE_SIZE  (16 * 1024 * 1024)
     191              : 
     192              : /*
     193              :  * Error queue size of DSM. It is desirable to make it large enough that a
     194              :  * typical ErrorResponse can be sent without blocking. That way, a worker that
     195              :  * errors out can write the whole message into the queue and terminate without
     196              :  * waiting for the user backend.
     197              :  */
     198              : #define DSM_ERROR_QUEUE_SIZE            (16 * 1024)
     199              : 
     200              : /*
     201              :  * There are three fields in each message received by the parallel apply
     202              :  * worker: start_lsn, end_lsn and send_time. Because we have updated these
     203              :  * statistics in the leader apply worker, we can ignore these fields in the
     204              :  * parallel apply worker (see function LogicalRepApplyLoop).
     205              :  */
     206              : #define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
     207              : 
     208              : /*
     209              :  * The type of session-level lock on a transaction being applied on a logical
     210              :  * replication subscriber.
     211              :  */
     212              : #define PARALLEL_APPLY_LOCK_STREAM  0
     213              : #define PARALLEL_APPLY_LOCK_XACT    1
     214              : 
     215              : /*
     216              :  * Hash table entry to map xid to the parallel apply worker state.
     217              :  */
     218              : typedef struct ParallelApplyWorkerEntry
     219              : {
     220              :     TransactionId xid;          /* Hash key -- must be first */
     221              :     ParallelApplyWorkerInfo *winfo;
     222              : } ParallelApplyWorkerEntry;
     223              : 
     224              : /*
     225              :  * A hash table used to cache the state of streaming transactions being applied
     226              :  * by the parallel apply workers.
     227              :  */
     228              : static HTAB *ParallelApplyTxnHash = NULL;
     229              : 
     230              : /*
     231              : * A list (pool) of active parallel apply workers. The information for
     232              : * the new worker is added to the list after successfully launching it. The
     233              : * list entry is removed if there are already enough workers in the worker
     234              : * pool at the end of the transaction. For more information about the worker
     235              : * pool, see comments atop this file.
     236              :  */
     237              : static List *ParallelApplyWorkerPool = NIL;
     238              : 
     239              : /*
     240              :  * Information shared between leader apply worker and parallel apply worker.
     241              :  */
     242              : ParallelApplyWorkerShared *MyParallelShared = NULL;
     243              : 
     244              : /*
     245              :  * Is there a message sent by a parallel apply worker that the leader apply
     246              :  * worker needs to receive?
     247              :  */
     248              : volatile sig_atomic_t ParallelApplyMessagePending = false;
     249              : 
     250              : /*
     251              :  * Cache the parallel apply worker information required for applying the
     252              :  * current streaming transaction. It is used to save the cost of searching the
     253              :  * hash table when applying the changes between STREAM_START and STREAM_STOP.
     254              :  */
     255              : static ParallelApplyWorkerInfo *stream_apply_worker = NULL;
     256              : 
     257              : /* A list to maintain subtransactions, if any. */
     258              : static List *subxactlist = NIL;
     259              : 
     260              : static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo);
     261              : static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared);
     262              : static PartialFileSetState pa_get_fileset_state(void);
     263              : 
     264              : /*
     265              :  * Returns true if it is OK to start a parallel apply worker, false otherwise.
     266              :  */
     267              : static bool
     268           86 : pa_can_start(void)
     269              : {
     270              :     /* Only leader apply workers can start parallel apply workers. */
     271           86 :     if (!am_leader_apply_worker())
     272           29 :         return false;
     273              : 
     274              :     /*
     275              :      * It is good to check for any change in the subscription parameter to
     276              :      * avoid the case where for a very long time the change doesn't get
     277              :      * reflected. This can happen when there is a constant flow of streaming
     278              :      * transactions that are handled by parallel apply workers.
     279              :      *
     280              :      * It is better to do it before the below checks so that the latest values
     281              :      * of subscription can be used for the checks.
     282              :      */
     283           57 :     maybe_reread_subscription();
     284              : 
     285              :     /*
     286              :      * Don't start a new parallel apply worker if the subscription is not
     287              :      * using parallel streaming mode, or if the publisher does not support
     288              :      * parallel apply.
     289              :      */
     290           57 :     if (!MyLogicalRepWorker->parallel_apply)
     291           28 :         return false;
     292              : 
     293              :     /*
     294              :      * Don't start a new parallel worker if user has set skiplsn as it's
     295              :      * possible that they want to skip the streaming transaction. For
     296              :      * streaming transactions, we need to serialize the transaction to a file
     297              :      * so that we can get the last LSN of the transaction to judge whether to
     298              :      * skip before starting to apply the change.
     299              :      *
     300              :      * One might think that we could allow parallelism if the first lsn of the
     301              :      * transaction is greater than skiplsn, but we don't send it with the
     302              :      * STREAM START message, and it doesn't seem worth sending the extra eight
     303              :      * bytes with the STREAM START to enable parallelism for this case.
     304              :      */
     305           29 :     if (XLogRecPtrIsValid(MySubscription->skiplsn))
     306            0 :         return false;
     307              : 
     308              :     /*
     309              :      * For streaming transactions that are being applied using a parallel
     310              :      * apply worker, we cannot decide whether to apply the change for a
     311              :      * relation that is not in the READY state (see
     312              :      * should_apply_changes_for_rel) as we won't know remote_final_lsn by that
     313              :      * time. So, we don't start the new parallel apply worker in this case.
     314              :      */
     315           29 :     if (!AllTablesyncsReady())
     316            0 :         return false;
     317              : 
     318           29 :     return true;
     319              : }
     320              : 
     321              : /*
     322              :  * Set up a dynamic shared memory segment.
     323              :  *
     324              :  * We set up a control region that contains a fixed-size worker info
     325              :  * (ParallelApplyWorkerShared), a message queue, and an error queue.
     326              :  *
     327              :  * Returns true on success, false on failure.
     328              :  */
     329              : static bool
     330           12 : pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
     331              : {
     332              :     shm_toc_estimator e;
     333              :     Size        segsize;
     334              :     dsm_segment *seg;
     335              :     shm_toc    *toc;
     336              :     ParallelApplyWorkerShared *shared;
     337              :     shm_mq     *mq;
     338           12 :     Size        queue_size = DSM_QUEUE_SIZE;
     339           12 :     Size        error_queue_size = DSM_ERROR_QUEUE_SIZE;
     340              : 
     341              :     /*
     342              :      * Estimate how much shared memory we need.
     343              :      *
     344              :      * Because the TOC machinery may choose to insert padding of oddly-sized
     345              :      * requests, we must estimate each chunk separately.
     346              :      *
     347              :      * We need one key to register the location of the header, and two other
     348              :      * keys to track the locations of the message queue and the error message
     349              :      * queue.
     350              :      */
     351           12 :     shm_toc_initialize_estimator(&e);
     352           12 :     shm_toc_estimate_chunk(&e, sizeof(ParallelApplyWorkerShared));
     353           12 :     shm_toc_estimate_chunk(&e, queue_size);
     354           12 :     shm_toc_estimate_chunk(&e, error_queue_size);
     355              : 
     356           12 :     shm_toc_estimate_keys(&e, 3);
     357           12 :     segsize = shm_toc_estimate(&e);
     358              : 
     359              :     /* Create the shared memory segment and establish a table of contents. */
     360           12 :     seg = dsm_create(shm_toc_estimate(&e), 0);
     361           12 :     if (!seg)
     362            0 :         return false;
     363              : 
     364           12 :     toc = shm_toc_create(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg),
     365              :                          segsize);
     366              : 
     367              :     /* Set up the header region. */
     368           12 :     shared = shm_toc_allocate(toc, sizeof(ParallelApplyWorkerShared));
     369           12 :     SpinLockInit(&shared->mutex);
     370              : 
     371           12 :     shared->xact_state = PARALLEL_TRANS_UNKNOWN;
     372           12 :     pg_atomic_init_u32(&(shared->pending_stream_count), 0);
     373           12 :     shared->last_commit_end = InvalidXLogRecPtr;
     374           12 :     shared->fileset_state = FS_EMPTY;
     375              : 
     376           12 :     shm_toc_insert(toc, PARALLEL_APPLY_KEY_SHARED, shared);
     377              : 
     378              :     /* Set up message queue for the worker. */
     379           12 :     mq = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
     380           12 :     shm_toc_insert(toc, PARALLEL_APPLY_KEY_MQ, mq);
     381           12 :     shm_mq_set_sender(mq, MyProc);
     382              : 
     383              :     /* Attach the queue. */
     384           12 :     winfo->mq_handle = shm_mq_attach(mq, seg, NULL);
     385              : 
     386              :     /* Set up error queue for the worker. */
     387           12 :     mq = shm_mq_create(shm_toc_allocate(toc, error_queue_size),
     388              :                        error_queue_size);
     389           12 :     shm_toc_insert(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, mq);
     390           12 :     shm_mq_set_receiver(mq, MyProc);
     391              : 
     392              :     /* Attach the queue. */
     393           12 :     winfo->error_mq_handle = shm_mq_attach(mq, seg, NULL);
     394              : 
     395              :     /* Return results to caller. */
     396           12 :     winfo->dsm_seg = seg;
     397           12 :     winfo->shared = shared;
     398              : 
     399           12 :     return true;
     400              : }
     401              : 
     402              : /*
     403              :  * Try to get a parallel apply worker from the pool. If none is available then
     404              :  * start a new one.
     405              :  */
     406              : static ParallelApplyWorkerInfo *
     407           29 : pa_launch_parallel_worker(void)
     408              : {
     409              :     MemoryContext oldcontext;
     410              :     bool        launched;
     411              :     ParallelApplyWorkerInfo *winfo;
     412              :     ListCell   *lc;
     413              : 
     414              :     /* Try to get an available parallel apply worker from the worker pool. */
     415           31 :     foreach(lc, ParallelApplyWorkerPool)
     416              :     {
     417           19 :         winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
     418              : 
     419           19 :         if (!winfo->in_use)
     420           17 :             return winfo;
     421              :     }
     422              : 
     423              :     /*
     424              :      * Start a new parallel apply worker.
     425              :      *
     426              :      * The worker info can be used for the lifetime of the worker process, so
     427              :      * create it in a permanent context.
     428              :      */
     429           12 :     oldcontext = MemoryContextSwitchTo(ApplyContext);
     430              : 
     431           12 :     winfo = palloc0_object(ParallelApplyWorkerInfo);
     432              : 
     433              :     /* Setup shared memory. */
     434           12 :     if (!pa_setup_dsm(winfo))
     435              :     {
     436            0 :         MemoryContextSwitchTo(oldcontext);
     437            0 :         pfree(winfo);
     438            0 :         return NULL;
     439              :     }
     440              : 
     441           12 :     launched = logicalrep_worker_launch(WORKERTYPE_PARALLEL_APPLY,
     442           12 :                                         MyLogicalRepWorker->dbid,
     443           12 :                                         MySubscription->oid,
     444           12 :                                         MySubscription->name,
     445           12 :                                         MyLogicalRepWorker->userid,
     446              :                                         InvalidOid,
     447              :                                         dsm_segment_handle(winfo->dsm_seg),
     448              :                                         false);
     449              : 
     450           12 :     if (launched)
     451              :     {
     452           12 :         ParallelApplyWorkerPool = lappend(ParallelApplyWorkerPool, winfo);
     453              :     }
     454              :     else
     455              :     {
     456            0 :         pa_free_worker_info(winfo);
     457            0 :         winfo = NULL;
     458              :     }
     459              : 
     460           12 :     MemoryContextSwitchTo(oldcontext);
     461              : 
     462           12 :     return winfo;
     463              : }
     464              : 
     465              : /*
     466              :  * Allocate a parallel apply worker that will be used for the specified xid.
     467              :  *
     468              :  * We first try to get an available worker from the pool, if any and then try
     469              :  * to launch a new worker. On successful allocation, remember the worker
     470              :  * information in the hash table so that we can get it later for processing the
     471              :  * streaming changes.
     472              :  */
     473              : void
     474           86 : pa_allocate_worker(TransactionId xid)
     475              : {
     476              :     bool        found;
     477           86 :     ParallelApplyWorkerInfo *winfo = NULL;
     478              :     ParallelApplyWorkerEntry *entry;
     479              : 
     480           86 :     if (!pa_can_start())
     481           57 :         return;
     482              : 
     483           29 :     winfo = pa_launch_parallel_worker();
     484           29 :     if (!winfo)
     485            0 :         return;
     486              : 
     487              :     /* First time through, initialize parallel apply worker state hashtable. */
     488           29 :     if (!ParallelApplyTxnHash)
     489              :     {
     490              :         HASHCTL     ctl;
     491              : 
     492          117 :         MemSet(&ctl, 0, sizeof(ctl));
     493            9 :         ctl.keysize = sizeof(TransactionId);
     494            9 :         ctl.entrysize = sizeof(ParallelApplyWorkerEntry);
     495            9 :         ctl.hcxt = ApplyContext;
     496              : 
     497            9 :         ParallelApplyTxnHash = hash_create("logical replication parallel apply workers hash",
     498              :                                            16, &ctl,
     499              :                                            HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
     500              :     }
     501              : 
     502              :     /* Create an entry for the requested transaction. */
     503           29 :     entry = hash_search(ParallelApplyTxnHash, &xid, HASH_ENTER, &found);
     504           29 :     if (found)
     505            0 :         elog(ERROR, "hash table corrupted");
     506              : 
     507              :     /* Update the transaction information in shared memory. */
     508           29 :     SpinLockAcquire(&winfo->shared->mutex);
     509           29 :     winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN;
     510           29 :     winfo->shared->xid = xid;
     511           29 :     SpinLockRelease(&winfo->shared->mutex);
     512              : 
     513           29 :     winfo->in_use = true;
     514           29 :     winfo->serialize_changes = false;
     515           29 :     entry->winfo = winfo;
     516              : }
     517              : 
     518              : /*
     519              :  * Find the assigned worker for the given transaction, if any.
     520              :  */
     521              : ParallelApplyWorkerInfo *
     522       257455 : pa_find_worker(TransactionId xid)
     523              : {
     524              :     bool        found;
     525              :     ParallelApplyWorkerEntry *entry;
     526              : 
     527       257455 :     if (!TransactionIdIsValid(xid))
     528        80272 :         return NULL;
     529              : 
     530       177183 :     if (!ParallelApplyTxnHash)
     531       103240 :         return NULL;
     532              : 
     533              :     /* Return the cached parallel apply worker if valid. */
     534        73943 :     if (stream_apply_worker)
     535        73651 :         return stream_apply_worker;
     536              : 
     537              :     /* Find an entry for the requested transaction. */
     538          292 :     entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
     539          292 :     if (found)
     540              :     {
     541              :         /* The worker must not have exited.  */
     542              :         Assert(entry->winfo->in_use);
     543          292 :         return entry->winfo;
     544              :     }
     545              : 
     546            0 :     return NULL;
     547              : }
     548              : 
     549              : /*
     550              :  * Makes the worker available for reuse.
     551              :  *
     552              :  * This removes the parallel apply worker entry from the hash table so that it
     553              :  * can't be used. If there are enough workers in the pool, it stops the worker
     554              :  * and frees the corresponding info. Otherwise it just marks the worker as
     555              :  * available for reuse.
     556              :  *
     557              :  * For more information about the worker pool, see comments atop this file.
     558              :  */
     559              : static void
     560           25 : pa_free_worker(ParallelApplyWorkerInfo *winfo)
     561              : {
     562              :     Assert(!am_parallel_apply_worker());
     563              :     Assert(winfo->in_use);
     564              :     Assert(pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED);
     565              : 
     566           25 :     if (!hash_search(ParallelApplyTxnHash, &winfo->shared->xid, HASH_REMOVE, NULL))
     567            0 :         elog(ERROR, "hash table corrupted");
     568              : 
     569              :     /*
     570              :      * Stop the worker if there are enough workers in the pool.
     571              :      *
     572              :      * XXX Additionally, we also stop the worker if the leader apply worker
     573              :      * serialize part of the transaction data due to a send timeout. This is
     574              :      * because the message could be partially written to the queue and there
     575              :      * is no way to clean the queue other than resending the message until it
     576              :      * succeeds. Instead of trying to send the data which anyway would have
     577              :      * been serialized and then letting the parallel apply worker deal with
     578              :      * the spurious message, we stop the worker.
     579              :      */
     580           25 :     if (winfo->serialize_changes ||
     581           21 :         list_length(ParallelApplyWorkerPool) >
     582           21 :         (max_parallel_apply_workers_per_subscription / 2))
     583              :     {
     584            5 :         logicalrep_pa_worker_stop(winfo);
     585            5 :         pa_free_worker_info(winfo);
     586              : 
     587            5 :         return;
     588              :     }
     589              : 
     590           20 :     winfo->in_use = false;
     591           20 :     winfo->serialize_changes = false;
     592              : }
     593              : 
     594              : /*
     595              :  * Free the parallel apply worker information and unlink the files with
     596              :  * serialized changes if any.
     597              :  */
     598              : static void
     599            5 : pa_free_worker_info(ParallelApplyWorkerInfo *winfo)
     600              : {
     601              :     Assert(winfo);
     602              : 
     603            5 :     if (winfo->mq_handle)
     604            5 :         shm_mq_detach(winfo->mq_handle);
     605              : 
     606            5 :     if (winfo->error_mq_handle)
     607            0 :         shm_mq_detach(winfo->error_mq_handle);
     608              : 
     609              :     /* Unlink the files with serialized changes. */
     610            5 :     if (winfo->serialize_changes)
     611            4 :         stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid);
     612              : 
     613            5 :     if (winfo->dsm_seg)
     614            5 :         dsm_detach(winfo->dsm_seg);
     615              : 
     616              :     /* Remove from the worker pool. */
     617            5 :     ParallelApplyWorkerPool = list_delete_ptr(ParallelApplyWorkerPool, winfo);
     618              : 
     619            5 :     pfree(winfo);
     620            5 : }
     621              : 
     622              : /*
     623              :  * Detach the error queue for all parallel apply workers.
     624              :  */
     625              : void
     626          354 : pa_detach_all_error_mq(void)
     627              : {
     628              :     ListCell   *lc;
     629              : 
     630          361 :     foreach(lc, ParallelApplyWorkerPool)
     631              :     {
     632            7 :         ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
     633              : 
     634            7 :         if (winfo->error_mq_handle)
     635              :         {
     636            7 :             shm_mq_detach(winfo->error_mq_handle);
     637            7 :             winfo->error_mq_handle = NULL;
     638              :         }
     639              :     }
     640          354 : }
     641              : 
     642              : /*
     643              :  * Check if there are any pending spooled messages.
     644              :  */
     645              : static bool
     646           16 : pa_has_spooled_message_pending(void)
     647              : {
     648              :     PartialFileSetState fileset_state;
     649              : 
     650           16 :     fileset_state = pa_get_fileset_state();
     651              : 
     652           16 :     return (fileset_state != FS_EMPTY);
     653              : }
     654              : 
     655              : /*
     656              :  * Replay the spooled messages once the leader apply worker has finished
     657              :  * serializing changes to the file.
     658              :  *
     659              :  * Returns false if there aren't any pending spooled messages, true otherwise.
     660              :  */
     661              : static bool
     662           56 : pa_process_spooled_messages_if_required(void)
     663              : {
     664              :     PartialFileSetState fileset_state;
     665              : 
     666           56 :     fileset_state = pa_get_fileset_state();
     667              : 
     668           56 :     if (fileset_state == FS_EMPTY)
     669           48 :         return false;
     670              : 
     671              :     /*
     672              :      * If the leader apply worker is busy serializing the partial changes then
     673              :      * acquire the stream lock now and wait for the leader worker to finish
     674              :      * serializing the changes. Otherwise, the parallel apply worker won't get
     675              :      * a chance to receive a STREAM_STOP (and acquire the stream lock) until
     676              :      * the leader had serialized all changes which can lead to undetected
     677              :      * deadlock.
     678              :      *
     679              :      * Note that the fileset state can be FS_SERIALIZE_DONE once the leader
     680              :      * worker has finished serializing the changes.
     681              :      */
     682            8 :     if (fileset_state == FS_SERIALIZE_IN_PROGRESS)
     683              :     {
     684            0 :         pa_lock_stream(MyParallelShared->xid, AccessShareLock);
     685            0 :         pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
     686              : 
     687            0 :         fileset_state = pa_get_fileset_state();
     688              :     }
     689              : 
     690              :     /*
     691              :      * We cannot read the file immediately after the leader has serialized all
     692              :      * changes to the file because there may still be messages in the memory
     693              :      * queue. We will apply all spooled messages the next time we call this
     694              :      * function and that will ensure there are no messages left in the memory
     695              :      * queue.
     696              :      */
     697            8 :     if (fileset_state == FS_SERIALIZE_DONE)
     698              :     {
     699            4 :         pa_set_fileset_state(MyParallelShared, FS_READY);
     700              :     }
     701            4 :     else if (fileset_state == FS_READY)
     702              :     {
     703            4 :         apply_spooled_messages(&MyParallelShared->fileset,
     704            4 :                                MyParallelShared->xid,
     705              :                                InvalidXLogRecPtr);
     706            4 :         pa_set_fileset_state(MyParallelShared, FS_EMPTY);
     707              :     }
     708              : 
     709            8 :     return true;
     710              : }
     711              : 
     712              : /*
     713              :  * Interrupt handler for main loop of parallel apply worker.
     714              :  */
     715              : static void
     716        64133 : ProcessParallelApplyInterrupts(void)
     717              : {
     718        64133 :     CHECK_FOR_INTERRUPTS();
     719              : 
     720        64129 :     if (ShutdownRequestPending)
     721              :     {
     722            5 :         ereport(LOG,
     723              :                 (errmsg("logical replication parallel apply worker for subscription \"%s\" has finished",
     724              :                         MySubscription->name)));
     725              : 
     726            5 :         proc_exit(0);
     727              :     }
     728              : 
     729        64124 :     if (ConfigReloadPending)
     730              :     {
     731            4 :         ConfigReloadPending = false;
     732            4 :         ProcessConfigFile(PGC_SIGHUP);
     733              :     }
     734        64124 : }
     735              : 
     736              : /* Parallel apply worker main loop. */
     737              : static void
     738           12 : LogicalParallelApplyLoop(shm_mq_handle *mqh)
     739              : {
     740              :     shm_mq_result shmq_res;
     741              :     ErrorContextCallback errcallback;
     742           12 :     MemoryContext oldcxt = CurrentMemoryContext;
     743              : 
     744              :     /*
     745              :      * Init the ApplyMessageContext which we clean up after each replication
     746              :      * protocol message.
     747              :      */
     748           12 :     ApplyMessageContext = AllocSetContextCreate(ApplyContext,
     749              :                                                 "ApplyMessageContext",
     750              :                                                 ALLOCSET_DEFAULT_SIZES);
     751              : 
     752              :     /*
     753              :      * Push apply error context callback. Fields will be filled while applying
     754              :      * a change.
     755              :      */
     756           12 :     errcallback.callback = apply_error_callback;
     757           12 :     errcallback.previous = error_context_stack;
     758           12 :     error_context_stack = &errcallback;
     759              : 
     760              :     for (;;)
     761        64121 :     {
     762              :         void       *data;
     763              :         Size        len;
     764              : 
     765        64133 :         ProcessParallelApplyInterrupts();
     766              : 
     767              :         /* Ensure we are reading the data into our memory context. */
     768        64124 :         MemoryContextSwitchTo(ApplyMessageContext);
     769              : 
     770        64124 :         shmq_res = shm_mq_receive(mqh, &len, &data, true);
     771              : 
     772        64124 :         if (shmq_res == SHM_MQ_SUCCESS)
     773              :         {
     774              :             StringInfoData s;
     775              :             int         c;
     776              : 
     777        64068 :             if (len == 0)
     778            0 :                 elog(ERROR, "invalid message length");
     779              : 
     780        64068 :             initReadOnlyStringInfo(&s, data, len);
     781              : 
     782              :             /*
     783              :              * The first byte of messages sent from leader apply worker to
     784              :              * parallel apply workers can only be PqReplMsg_WALData.
     785              :              */
     786        64068 :             c = pq_getmsgbyte(&s);
     787        64068 :             if (c != PqReplMsg_WALData)
     788            0 :                 elog(ERROR, "unexpected message \"%c\"", c);
     789              : 
     790              :             /*
     791              :              * Ignore statistics fields that have been updated by the leader
     792              :              * apply worker.
     793              :              *
     794              :              * XXX We can avoid sending the statistics fields from the leader
     795              :              * apply worker but for that, it needs to rebuild the entire
     796              :              * message by removing these fields which could be more work than
     797              :              * simply ignoring these fields in the parallel apply worker.
     798              :              */
     799        64068 :             s.cursor += SIZE_STATS_MESSAGE;
     800              : 
     801        64068 :             apply_dispatch(&s);
     802              :         }
     803           56 :         else if (shmq_res == SHM_MQ_WOULD_BLOCK)
     804              :         {
     805              :             /* Replay the changes from the file, if any. */
     806           56 :             if (!pa_process_spooled_messages_if_required())
     807              :             {
     808              :                 int         rc;
     809              : 
     810              :                 /* Wait for more work. */
     811           48 :                 rc = WaitLatch(MyLatch,
     812              :                                WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     813              :                                1000L,
     814              :                                WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN);
     815              : 
     816           48 :                 if (rc & WL_LATCH_SET)
     817           43 :                     ResetLatch(MyLatch);
     818              :             }
     819              :         }
     820              :         else
     821              :         {
     822              :             Assert(shmq_res == SHM_MQ_DETACHED);
     823              : 
     824            0 :             ereport(ERROR,
     825              :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     826              :                      errmsg("lost connection to the logical replication apply worker")));
     827              :         }
     828              : 
     829        64121 :         MemoryContextReset(ApplyMessageContext);
     830        64121 :         MemoryContextSwitchTo(oldcxt);
     831              :     }
     832              : 
     833              :     /* Pop the error context stack. */
     834              :     error_context_stack = errcallback.previous;
     835              : 
     836              :     MemoryContextSwitchTo(oldcxt);
     837              : }
     838              : 
     839              : /*
     840              :  * Make sure the leader apply worker tries to read from our error queue one more
     841              :  * time. This guards against the case where we exit uncleanly without sending
     842              :  * an ErrorResponse, for example because some code calls proc_exit directly.
     843              :  *
     844              :  * Also explicitly detach from dsm segment to invoke on_dsm_detach callbacks,
     845              :  * if any. See ParallelWorkerShutdown for details.
     846              :  */
     847              : static void
     848           12 : pa_shutdown(int code, Datum arg)
     849              : {
     850           12 :     SendProcSignal(MyLogicalRepWorker->leader_pid,
     851              :                    PROCSIG_PARALLEL_APPLY_MESSAGE,
     852              :                    INVALID_PROC_NUMBER);
     853              : 
     854           12 :     dsm_detach((dsm_segment *) DatumGetPointer(arg));
     855           12 : }
     856              : 
     857              : /*
     858              :  * Parallel apply worker entry point.
     859              :  */
     860              : void
     861           12 : ParallelApplyWorkerMain(Datum main_arg)
     862              : {
     863              :     ParallelApplyWorkerShared *shared;
     864              :     dsm_handle  handle;
     865              :     dsm_segment *seg;
     866              :     shm_toc    *toc;
     867              :     shm_mq     *mq;
     868              :     shm_mq_handle *mqh;
     869              :     shm_mq_handle *error_mqh;
     870              :     ReplOriginId originid;
     871           12 :     int         worker_slot = DatumGetInt32(main_arg);
     872              :     char        originname[NAMEDATALEN];
     873              : 
     874           12 :     InitializingApplyWorker = true;
     875              : 
     876              :     /*
     877              :      * Setup signal handling.
     878              :      *
     879              :      * Note: We intentionally used SIGUSR2 to trigger a graceful shutdown
     880              :      * initiated by the leader apply worker. This helps to differentiate it
     881              :      * from the case where we abort the current transaction and exit on
     882              :      * receiving SIGTERM.
     883              :      */
     884           12 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
     885           12 :     pqsignal(SIGUSR2, SignalHandlerForShutdownRequest);
     886           12 :     BackgroundWorkerUnblockSignals();
     887              : 
     888              :     /*
     889              :      * Attach to the dynamic shared memory segment for the parallel apply, and
     890              :      * find its table of contents.
     891              :      *
     892              :      * Like parallel query, we don't need resource owner by this time. See
     893              :      * ParallelWorkerMain.
     894              :      */
     895           12 :     memcpy(&handle, MyBgworkerEntry->bgw_extra, sizeof(dsm_handle));
     896           12 :     seg = dsm_attach(handle);
     897           12 :     if (!seg)
     898            0 :         ereport(ERROR,
     899              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     900              :                  errmsg("could not map dynamic shared memory segment")));
     901              : 
     902           12 :     toc = shm_toc_attach(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg));
     903           12 :     if (!toc)
     904            0 :         ereport(ERROR,
     905              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     906              :                  errmsg("invalid magic number in dynamic shared memory segment")));
     907              : 
     908              :     /* Look up the shared information. */
     909           12 :     shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false);
     910           12 :     MyParallelShared = shared;
     911              : 
     912              :     /*
     913              :      * Attach to the message queue.
     914              :      */
     915           12 :     mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_MQ, false);
     916           12 :     shm_mq_set_receiver(mq, MyProc);
     917           12 :     mqh = shm_mq_attach(mq, seg, NULL);
     918              : 
     919              :     /*
     920              :      * Primary initialization is complete. Now, we can attach to our slot.
     921              :      * This is to ensure that the leader apply worker does not write data to
     922              :      * the uninitialized memory queue.
     923              :      */
     924           12 :     logicalrep_worker_attach(worker_slot);
     925              : 
     926              :     /*
     927              :      * Register the shutdown callback after we are attached to the worker
     928              :      * slot. This is to ensure that MyLogicalRepWorker remains valid when this
     929              :      * callback is invoked.
     930              :      */
     931           12 :     before_shmem_exit(pa_shutdown, PointerGetDatum(seg));
     932              : 
     933           12 :     SpinLockAcquire(&MyParallelShared->mutex);
     934           12 :     MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation;
     935           12 :     MyParallelShared->logicalrep_worker_slot_no = worker_slot;
     936           12 :     SpinLockRelease(&MyParallelShared->mutex);
     937              : 
     938              :     /*
     939              :      * Attach to the error queue.
     940              :      */
     941           12 :     mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, false);
     942           12 :     shm_mq_set_sender(mq, MyProc);
     943           12 :     error_mqh = shm_mq_attach(mq, seg, NULL);
     944              : 
     945           12 :     pq_redirect_to_shm_mq(seg, error_mqh);
     946           12 :     pq_set_parallel_leader(MyLogicalRepWorker->leader_pid,
     947              :                            INVALID_PROC_NUMBER);
     948              : 
     949           12 :     MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
     950           12 :         MyLogicalRepWorker->reply_time = 0;
     951              : 
     952           12 :     InitializeLogRepWorker();
     953              : 
     954           12 :     InitializingApplyWorker = false;
     955              : 
     956              :     /* Setup replication origin tracking. */
     957           12 :     StartTransactionCommand();
     958           12 :     ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
     959              :                                        originname, sizeof(originname));
     960           12 :     originid = replorigin_by_name(originname, false);
     961              : 
     962              :     /*
     963              :      * The parallel apply worker doesn't need to monopolize this replication
     964              :      * origin which was already acquired by its leader process.
     965              :      */
     966           12 :     replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
     967           12 :     replorigin_xact_state.origin = originid;
     968           12 :     CommitTransactionCommand();
     969              : 
     970              :     /*
     971              :      * Setup callback for syscache so that we know when something changes in
     972              :      * the subscription relation state.
     973              :      */
     974           12 :     CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
     975              :                                   InvalidateSyncingRelStates,
     976              :                                   (Datum) 0);
     977              : 
     978           12 :     set_apply_error_context_origin(originname);
     979              : 
     980           12 :     LogicalParallelApplyLoop(mqh);
     981              : 
     982              :     /*
     983              :      * The parallel apply worker must not get here because the parallel apply
     984              :      * worker will only stop when it receives a SIGTERM or SIGUSR2 from the
     985              :      * leader, or SIGINT from itself, or when there is an error. None of these
     986              :      * cases will allow the code to reach here.
     987              :      */
     988              :     Assert(false);
     989            0 : }
     990              : 
     991              : /*
     992              :  * Handle receipt of an interrupt indicating a parallel apply worker message.
     993              :  *
     994              :  * Note: this is called within a signal handler! All we can do is set a flag
     995              :  * that will cause the next CHECK_FOR_INTERRUPTS() to invoke
     996              :  * ProcessParallelApplyMessages().
     997              :  */
     998              : void
     999           13 : HandleParallelApplyMessageInterrupt(void)
    1000              : {
    1001           13 :     InterruptPending = true;
    1002           13 :     ParallelApplyMessagePending = true;
    1003           13 :     SetLatch(MyLatch);
    1004           13 : }
    1005              : 
    1006              : /*
    1007              :  * Process a single protocol message received from a single parallel apply
    1008              :  * worker.
    1009              :  */
    1010              : static void
    1011            2 : ProcessParallelApplyMessage(StringInfo msg)
    1012              : {
    1013              :     char        msgtype;
    1014              : 
    1015            2 :     msgtype = pq_getmsgbyte(msg);
    1016              : 
    1017            2 :     switch (msgtype)
    1018              :     {
    1019            2 :         case PqMsg_ErrorResponse:
    1020              :             {
    1021              :                 ErrorData   edata;
    1022              : 
    1023              :                 /* Parse ErrorResponse. */
    1024            2 :                 pq_parse_errornotice(msg, &edata);
    1025              : 
    1026              :                 /*
    1027              :                  * If desired, add a context line to show that this is a
    1028              :                  * message propagated from a parallel apply worker. Otherwise,
    1029              :                  * it can sometimes be confusing to understand what actually
    1030              :                  * happened.
    1031              :                  */
    1032            2 :                 if (edata.context)
    1033            2 :                     edata.context = psprintf("%s\n%s", edata.context,
    1034              :                                              _("logical replication parallel apply worker"));
    1035              :                 else
    1036            0 :                     edata.context = pstrdup(_("logical replication parallel apply worker"));
    1037              : 
    1038              :                 /*
    1039              :                  * Context beyond that should use the error context callbacks
    1040              :                  * that were in effect in LogicalRepApplyLoop().
    1041              :                  */
    1042            2 :                 error_context_stack = apply_error_context_stack;
    1043              : 
    1044              :                 /*
    1045              :                  * The actual error must have been reported by the parallel
    1046              :                  * apply worker.
    1047              :                  */
    1048            2 :                 ereport(ERROR,
    1049              :                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1050              :                          errmsg("logical replication parallel apply worker exited due to error"),
    1051              :                          errcontext("%s", edata.context)));
    1052              :             }
    1053              : 
    1054              :             /*
    1055              :              * Don't need to do anything about NoticeResponse and
    1056              :              * NotificationResponse as the logical replication worker doesn't
    1057              :              * need to send messages to the client.
    1058              :              */
    1059            0 :         case PqMsg_NoticeResponse:
    1060              :         case PqMsg_NotificationResponse:
    1061            0 :             break;
    1062              : 
    1063            0 :         default:
    1064            0 :             elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",
    1065              :                  msgtype, msg->len);
    1066              :     }
    1067            0 : }
    1068              : 
    1069              : /*
    1070              :  * Handle any queued protocol messages received from parallel apply workers.
    1071              :  */
    1072              : void
    1073            7 : ProcessParallelApplyMessages(void)
    1074              : {
    1075              :     ListCell   *lc;
    1076              :     MemoryContext oldcontext;
    1077              : 
    1078              :     static MemoryContext hpam_context = NULL;
    1079              : 
    1080              :     /*
    1081              :      * This is invoked from ProcessInterrupts(), and since some of the
    1082              :      * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
    1083              :      * for recursive calls if more signals are received while this runs. It's
    1084              :      * unclear that recursive entry would be safe, and it doesn't seem useful
    1085              :      * even if it is safe, so let's block interrupts until done.
    1086              :      */
    1087            7 :     HOLD_INTERRUPTS();
    1088              : 
    1089              :     /*
    1090              :      * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
    1091              :      * don't want to risk leaking data into long-lived contexts, so let's do
    1092              :      * our work here in a private context that we can reset on each use.
    1093              :      */
    1094            7 :     if (!hpam_context)          /* first time through? */
    1095            6 :         hpam_context = AllocSetContextCreate(TopMemoryContext,
    1096              :                                              "ProcessParallelApplyMessages",
    1097              :                                              ALLOCSET_DEFAULT_SIZES);
    1098              :     else
    1099            1 :         MemoryContextReset(hpam_context);
    1100              : 
    1101            7 :     oldcontext = MemoryContextSwitchTo(hpam_context);
    1102              : 
    1103            7 :     ParallelApplyMessagePending = false;
    1104              : 
    1105           13 :     foreach(lc, ParallelApplyWorkerPool)
    1106              :     {
    1107              :         shm_mq_result res;
    1108              :         Size        nbytes;
    1109              :         void       *data;
    1110            8 :         ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
    1111              : 
    1112              :         /*
    1113              :          * The leader will detach from the error queue and set it to NULL
    1114              :          * before preparing to stop all parallel apply workers, so we don't
    1115              :          * need to handle error messages anymore. See
    1116              :          * logicalrep_worker_detach.
    1117              :          */
    1118            8 :         if (!winfo->error_mq_handle)
    1119            6 :             continue;
    1120              : 
    1121            3 :         res = shm_mq_receive(winfo->error_mq_handle, &nbytes, &data, true);
    1122              : 
    1123            3 :         if (res == SHM_MQ_WOULD_BLOCK)
    1124            1 :             continue;
    1125            2 :         else if (res == SHM_MQ_SUCCESS)
    1126              :         {
    1127              :             StringInfoData msg;
    1128              : 
    1129            2 :             initStringInfo(&msg);
    1130            2 :             appendBinaryStringInfo(&msg, data, nbytes);
    1131            2 :             ProcessParallelApplyMessage(&msg);
    1132            0 :             pfree(msg.data);
    1133              :         }
    1134              :         else
    1135            0 :             ereport(ERROR,
    1136              :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1137              :                      errmsg("lost connection to the logical replication parallel apply worker")));
    1138              :     }
    1139              : 
    1140            5 :     MemoryContextSwitchTo(oldcontext);
    1141              : 
    1142              :     /* Might as well clear the context on our way out */
    1143            5 :     MemoryContextReset(hpam_context);
    1144              : 
    1145            5 :     RESUME_INTERRUPTS();
    1146            5 : }
    1147              : 
    1148              : /*
    1149              :  * Send the data to the specified parallel apply worker via shared-memory
    1150              :  * queue.
    1151              :  *
    1152              :  * Returns false if the attempt to send data via shared memory times out, true
    1153              :  * otherwise.
    1154              :  */
    1155              : bool
    1156        68906 : pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
    1157              : {
    1158              :     int         rc;
    1159              :     shm_mq_result result;
    1160        68906 :     TimestampTz startTime = 0;
    1161              : 
    1162              :     Assert(!IsTransactionState());
    1163              :     Assert(!winfo->serialize_changes);
    1164              : 
    1165              :     /*
    1166              :      * We don't try to send data to parallel worker for 'immediate' mode. This
    1167              :      * is primarily used for testing purposes.
    1168              :      */
    1169        68906 :     if (unlikely(debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE))
    1170            4 :         return false;
    1171              : 
    1172              : /*
    1173              :  * This timeout is a bit arbitrary but testing revealed that it is sufficient
    1174              :  * to send the message unless the parallel apply worker is waiting on some
    1175              :  * lock or there is a serious resource crunch. See the comments atop this file
    1176              :  * to know why we are using a non-blocking way to send the message.
    1177              :  */
    1178              : #define SHM_SEND_RETRY_INTERVAL_MS 1000
    1179              : #define SHM_SEND_TIMEOUT_MS     (10000 - SHM_SEND_RETRY_INTERVAL_MS)
    1180              : 
    1181              :     for (;;)
    1182              :     {
    1183        68902 :         result = shm_mq_send(winfo->mq_handle, nbytes, data, true, true);
    1184              : 
    1185        68902 :         if (result == SHM_MQ_SUCCESS)
    1186        68902 :             return true;
    1187            0 :         else if (result == SHM_MQ_DETACHED)
    1188            0 :             ereport(ERROR,
    1189              :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1190              :                      errmsg("could not send data to shared-memory queue")));
    1191              : 
    1192              :         Assert(result == SHM_MQ_WOULD_BLOCK);
    1193              : 
    1194              :         /* Wait before retrying. */
    1195            0 :         rc = WaitLatch(MyLatch,
    1196              :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1197              :                        SHM_SEND_RETRY_INTERVAL_MS,
    1198              :                        WAIT_EVENT_LOGICAL_APPLY_SEND_DATA);
    1199              : 
    1200            0 :         if (rc & WL_LATCH_SET)
    1201              :         {
    1202            0 :             ResetLatch(MyLatch);
    1203            0 :             CHECK_FOR_INTERRUPTS();
    1204              :         }
    1205              : 
    1206            0 :         if (startTime == 0)
    1207            0 :             startTime = GetCurrentTimestamp();
    1208            0 :         else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
    1209              :                                             SHM_SEND_TIMEOUT_MS))
    1210            0 :             return false;
    1211              :     }
    1212              : }
    1213              : 
    1214              : /*
    1215              :  * Switch to PARTIAL_SERIALIZE mode for the current transaction -- this means
    1216              :  * that the current data and any subsequent data for this transaction will be
    1217              :  * serialized to a file. This is done to prevent possible deadlocks with
    1218              :  * another parallel apply worker (refer to the comments atop this file).
    1219              :  */
    1220              : void
    1221            4 : pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
    1222              :                                bool stream_locked)
    1223              : {
    1224            4 :     ereport(LOG,
    1225              :             (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
    1226              :                     winfo->shared->xid)));
    1227              : 
    1228              :     /*
    1229              :      * The parallel apply worker could be stuck for some reason (say waiting
    1230              :      * on some lock by other backend), so stop trying to send data directly to
    1231              :      * it and start serializing data to the file instead.
    1232              :      */
    1233            4 :     winfo->serialize_changes = true;
    1234              : 
    1235              :     /* Initialize the stream fileset. */
    1236            4 :     stream_start_internal(winfo->shared->xid, true);
    1237              : 
    1238              :     /*
    1239              :      * Acquires the stream lock if not already to make sure that the parallel
    1240              :      * apply worker will wait for the leader to release the stream lock until
    1241              :      * the end of the transaction.
    1242              :      */
    1243            4 :     if (!stream_locked)
    1244            4 :         pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
    1245              : 
    1246            4 :     pa_set_fileset_state(winfo->shared, FS_SERIALIZE_IN_PROGRESS);
    1247            4 : }
    1248              : 
    1249              : /*
    1250              :  * Wait until the parallel apply worker's transaction state has reached or
    1251              :  * exceeded the given xact_state.
    1252              :  */
    1253              : static void
    1254           27 : pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo,
    1255              :                        ParallelTransState xact_state)
    1256              : {
    1257              :     for (;;)
    1258              :     {
    1259              :         /*
    1260              :          * Stop if the transaction state has reached or exceeded the given
    1261              :          * xact_state.
    1262              :          */
    1263          368 :         if (pa_get_xact_state(winfo->shared) >= xact_state)
    1264           27 :             break;
    1265              : 
    1266              :         /* Wait to be signalled. */
    1267          341 :         (void) WaitLatch(MyLatch,
    1268              :                          WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1269              :                          10L,
    1270              :                          WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);
    1271              : 
    1272              :         /* Reset the latch so we don't spin. */
    1273          341 :         ResetLatch(MyLatch);
    1274              : 
    1275              :         /* An interrupt may have occurred while we were waiting. */
    1276          341 :         CHECK_FOR_INTERRUPTS();
    1277              :     }
    1278           27 : }
    1279              : 
    1280              : /*
    1281              :  * Wait until the parallel apply worker's transaction finishes.
    1282              :  */
    1283              : static void
    1284           27 : pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
    1285              : {
    1286              :     /*
    1287              :      * Wait until the parallel apply worker set the state to
    1288              :      * PARALLEL_TRANS_STARTED which means it has acquired the transaction
    1289              :      * lock. This is to prevent leader apply worker from acquiring the
    1290              :      * transaction lock earlier than the parallel apply worker.
    1291              :      */
    1292           27 :     pa_wait_for_xact_state(winfo, PARALLEL_TRANS_STARTED);
    1293              : 
    1294              :     /*
    1295              :      * Wait for the transaction lock to be released. This is required to
    1296              :      * detect deadlock among leader and parallel apply workers. Refer to the
    1297              :      * comments atop this file.
    1298              :      */
    1299           27 :     pa_lock_transaction(winfo->shared->xid, AccessShareLock);
    1300           25 :     pa_unlock_transaction(winfo->shared->xid, AccessShareLock);
    1301              : 
    1302              :     /*
    1303              :      * Check if the state becomes PARALLEL_TRANS_FINISHED in case the parallel
    1304              :      * apply worker failed while applying changes causing the lock to be
    1305              :      * released.
    1306              :      */
    1307           25 :     if (pa_get_xact_state(winfo->shared) != PARALLEL_TRANS_FINISHED)
    1308            0 :         ereport(ERROR,
    1309              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1310              :                  errmsg("lost connection to the logical replication parallel apply worker")));
    1311           25 : }
    1312              : 
    1313              : /*
    1314              :  * Set the transaction state for a given parallel apply worker.
    1315              :  */
    1316              : void
    1317           54 : pa_set_xact_state(ParallelApplyWorkerShared *wshared,
    1318              :                   ParallelTransState xact_state)
    1319              : {
    1320           54 :     SpinLockAcquire(&wshared->mutex);
    1321           54 :     wshared->xact_state = xact_state;
    1322           54 :     SpinLockRelease(&wshared->mutex);
    1323           54 : }
    1324              : 
    1325              : /*
    1326              :  * Get the transaction state for a given parallel apply worker.
    1327              :  */
    1328              : static ParallelTransState
    1329          393 : pa_get_xact_state(ParallelApplyWorkerShared *wshared)
    1330              : {
    1331              :     ParallelTransState xact_state;
    1332              : 
    1333          393 :     SpinLockAcquire(&wshared->mutex);
    1334          393 :     xact_state = wshared->xact_state;
    1335          393 :     SpinLockRelease(&wshared->mutex);
    1336              : 
    1337          393 :     return xact_state;
    1338              : }
    1339              : 
    1340              : /*
    1341              :  * Cache the parallel apply worker information.
    1342              :  */
    1343              : void
    1344          510 : pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
    1345              : {
    1346          510 :     stream_apply_worker = winfo;
    1347          510 : }
    1348              : 
    1349              : /*
    1350              :  * Form a unique savepoint name for the streaming transaction.
    1351              :  *
    1352              :  * Note that different subscriptions for publications on different nodes can
    1353              :  * receive same remote xid, so we need to use subscription id along with it.
    1354              :  *
    1355              :  * Returns the name in the supplied buffer.
    1356              :  */
    1357              : static void
    1358           27 : pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
    1359              : {
    1360           27 :     snprintf(spname, szsp, "pg_sp_%u_%u", suboid, xid);
    1361           27 : }
    1362              : 
    1363              : /*
    1364              :  * Define a savepoint for a subxact in parallel apply worker if needed.
    1365              :  *
    1366              :  * The parallel apply worker can figure out if a new subtransaction was
    1367              :  * started by checking if the new change arrived with a different xid. In that
    1368              :  * case define a named savepoint, so that we are able to rollback to it
    1369              :  * if required.
    1370              :  */
    1371              : void
    1372        68584 : pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
    1373              : {
    1374        68584 :     if (current_xid != top_xid &&
    1375           52 :         !list_member_xid(subxactlist, current_xid))
    1376              :     {
    1377              :         MemoryContext oldctx;
    1378              :         char        spname[NAMEDATALEN];
    1379              : 
    1380           17 :         pa_savepoint_name(MySubscription->oid, current_xid,
    1381              :                           spname, sizeof(spname));
    1382              : 
    1383           17 :         elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);
    1384              : 
    1385              :         /* We must be in transaction block to define the SAVEPOINT. */
    1386           17 :         if (!IsTransactionBlock())
    1387              :         {
    1388            5 :             if (!IsTransactionState())
    1389            0 :                 StartTransactionCommand();
    1390              : 
    1391            5 :             BeginTransactionBlock();
    1392            5 :             CommitTransactionCommand();
    1393              :         }
    1394              : 
    1395           17 :         DefineSavepoint(spname);
    1396              : 
    1397              :         /*
    1398              :          * CommitTransactionCommand is needed to start a subtransaction after
    1399              :          * issuing a SAVEPOINT inside a transaction block (see
    1400              :          * StartSubTransaction()).
    1401              :          */
    1402           17 :         CommitTransactionCommand();
    1403              : 
    1404           17 :         oldctx = MemoryContextSwitchTo(TopTransactionContext);
    1405           17 :         subxactlist = lappend_xid(subxactlist, current_xid);
    1406           17 :         MemoryContextSwitchTo(oldctx);
    1407              :     }
    1408        68584 : }
    1409              : 
    1410              : /* Reset the list that maintains subtransactions. */
    1411              : void
    1412           25 : pa_reset_subtrans(void)
    1413              : {
    1414              :     /*
    1415              :      * We don't need to free this explicitly as the allocated memory will be
    1416              :      * freed at the transaction end.
    1417              :      */
    1418           25 :     subxactlist = NIL;
    1419           25 : }
    1420              : 
    1421              : /*
    1422              :  * Handle STREAM ABORT message when the transaction was applied in a parallel
    1423              :  * apply worker.
    1424              :  */
    1425              : void
    1426           12 : pa_stream_abort(LogicalRepStreamAbortData *abort_data)
    1427              : {
    1428           12 :     TransactionId xid = abort_data->xid;
    1429           12 :     TransactionId subxid = abort_data->subxid;
    1430              : 
    1431              :     /*
    1432              :      * Update origin state so we can restart streaming from correct position
    1433              :      * in case of crash.
    1434              :      */
    1435           12 :     replorigin_xact_state.origin_lsn = abort_data->abort_lsn;
    1436           12 :     replorigin_xact_state.origin_timestamp = abort_data->abort_time;
    1437              : 
    1438              :     /*
    1439              :      * If the two XIDs are the same, it's in fact abort of toplevel xact, so
    1440              :      * just free the subxactlist.
    1441              :      */
    1442           12 :     if (subxid == xid)
    1443              :     {
    1444            2 :         pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
    1445              : 
    1446              :         /*
    1447              :          * Release the lock as we might be processing an empty streaming
    1448              :          * transaction in which case the lock won't be released during
    1449              :          * transaction rollback.
    1450              :          *
    1451              :          * Note that it's ok to release the transaction lock before aborting
    1452              :          * the transaction because even if the parallel apply worker dies due
    1453              :          * to crash or some other reason, such a transaction would still be
    1454              :          * considered aborted.
    1455              :          */
    1456            2 :         pa_unlock_transaction(xid, AccessExclusiveLock);
    1457              : 
    1458            2 :         AbortCurrentTransaction();
    1459              : 
    1460            2 :         if (IsTransactionBlock())
    1461              :         {
    1462            1 :             EndTransactionBlock(false);
    1463            1 :             CommitTransactionCommand();
    1464              :         }
    1465              : 
    1466            2 :         pa_reset_subtrans();
    1467              : 
    1468            2 :         pgstat_report_activity(STATE_IDLE, NULL);
    1469              :     }
    1470              :     else
    1471              :     {
    1472              :         /* OK, so it's a subxact. Rollback to the savepoint. */
    1473              :         int         i;
    1474              :         char        spname[NAMEDATALEN];
    1475              : 
    1476           10 :         pa_savepoint_name(MySubscription->oid, subxid, spname, sizeof(spname));
    1477              : 
    1478           10 :         elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);
    1479              : 
    1480              :         /*
    1481              :          * Search the subxactlist, determine the offset tracked for the
    1482              :          * subxact, and truncate the list.
    1483              :          *
    1484              :          * Note that for an empty sub-transaction we won't find the subxid
    1485              :          * here.
    1486              :          */
    1487           12 :         for (i = list_length(subxactlist) - 1; i >= 0; i--)
    1488              :         {
    1489           11 :             TransactionId xid_tmp = lfirst_xid(list_nth_cell(subxactlist, i));
    1490              : 
    1491           11 :             if (xid_tmp == subxid)
    1492              :             {
    1493            9 :                 RollbackToSavepoint(spname);
    1494            9 :                 CommitTransactionCommand();
    1495            9 :                 subxactlist = list_truncate(subxactlist, i);
    1496            9 :                 break;
    1497              :             }
    1498              :         }
    1499              :     }
    1500           12 : }
    1501              : 
    1502              : /*
    1503              :  * Set the fileset state for a particular parallel apply worker. The fileset
    1504              :  * will be set once the leader worker serialized all changes to the file
    1505              :  * so that it can be used by parallel apply worker.
    1506              :  */
    1507              : void
    1508           16 : pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
    1509              :                      PartialFileSetState fileset_state)
    1510              : {
    1511           16 :     SpinLockAcquire(&wshared->mutex);
    1512           16 :     wshared->fileset_state = fileset_state;
    1513              : 
    1514           16 :     if (fileset_state == FS_SERIALIZE_DONE)
    1515              :     {
    1516              :         Assert(am_leader_apply_worker());
    1517              :         Assert(MyLogicalRepWorker->stream_fileset);
    1518            4 :         wshared->fileset = *MyLogicalRepWorker->stream_fileset;
    1519              :     }
    1520              : 
    1521           16 :     SpinLockRelease(&wshared->mutex);
    1522           16 : }
    1523              : 
    1524              : /*
    1525              :  * Get the fileset state for the current parallel apply worker.
    1526              :  */
    1527              : static PartialFileSetState
    1528           72 : pa_get_fileset_state(void)
    1529              : {
    1530              :     PartialFileSetState fileset_state;
    1531              : 
    1532              :     Assert(am_parallel_apply_worker());
    1533              : 
    1534           72 :     SpinLockAcquire(&MyParallelShared->mutex);
    1535           72 :     fileset_state = MyParallelShared->fileset_state;
    1536           72 :     SpinLockRelease(&MyParallelShared->mutex);
    1537              : 
    1538           72 :     return fileset_state;
    1539              : }
    1540              : 
    1541              : /*
    1542              :  * Helper functions to acquire and release a lock for each stream block.
    1543              :  *
    1544              :  * Set locktag_field4 to PARALLEL_APPLY_LOCK_STREAM to indicate that it's a
    1545              :  * stream lock.
    1546              :  *
    1547              :  * Refer to the comments atop this file to see how the stream lock is used.
    1548              :  */
    1549              : void
    1550          279 : pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
    1551              : {
    1552          279 :     LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
    1553              :                                    PARALLEL_APPLY_LOCK_STREAM, lockmode);
    1554          277 : }
    1555              : 
    1556              : void
    1557          275 : pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
    1558              : {
    1559          275 :     UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
    1560              :                                      PARALLEL_APPLY_LOCK_STREAM, lockmode);
    1561          275 : }
    1562              : 
    1563              : /*
    1564              :  * Helper functions to acquire and release a lock for each local transaction
    1565              :  * apply.
    1566              :  *
    1567              :  * Set locktag_field4 to PARALLEL_APPLY_LOCK_XACT to indicate that it's a
    1568              :  * transaction lock.
    1569              :  *
    1570              :  * Note that all the callers must pass a remote transaction ID instead of a
    1571              :  * local transaction ID as xid. This is because the local transaction ID will
    1572              :  * only be assigned while applying the first change in the parallel apply but
    1573              :  * it's possible that the first change in the parallel apply worker is blocked
    1574              :  * by a concurrently executing transaction in another parallel apply worker. We
    1575              :  * can only communicate the local transaction id to the leader after applying
    1576              :  * the first change so it won't be able to wait after sending the xact finish
    1577              :  * command using this lock.
    1578              :  *
    1579              :  * Refer to the comments atop this file to see how the transaction lock is
    1580              :  * used.
    1581              :  */
    1582              : void
    1583           56 : pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
    1584              : {
    1585           56 :     LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
    1586              :                                    PARALLEL_APPLY_LOCK_XACT, lockmode);
    1587           54 : }
    1588              : 
    1589              : void
    1590           50 : pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
    1591              : {
    1592           50 :     UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
    1593              :                                      PARALLEL_APPLY_LOCK_XACT, lockmode);
    1594           50 : }
    1595              : 
    1596              : /*
    1597              :  * Decrement the number of pending streaming blocks and wait on the stream lock
    1598              :  * if there is no pending block available.
    1599              :  */
    1600              : void
    1601          254 : pa_decr_and_wait_stream_block(void)
    1602              : {
    1603              :     Assert(am_parallel_apply_worker());
    1604              : 
    1605              :     /*
    1606              :      * It is only possible to not have any pending stream chunks when we are
    1607              :      * applying spooled messages.
    1608              :      */
    1609          254 :     if (pg_atomic_read_u32(&MyParallelShared->pending_stream_count) == 0)
    1610              :     {
    1611           16 :         if (pa_has_spooled_message_pending())
    1612           16 :             return;
    1613              : 
    1614            0 :         elog(ERROR, "invalid pending streaming chunk 0");
    1615              :     }
    1616              : 
    1617          238 :     if (pg_atomic_sub_fetch_u32(&MyParallelShared->pending_stream_count, 1) == 0)
    1618              :     {
    1619           26 :         pa_lock_stream(MyParallelShared->xid, AccessShareLock);
    1620           24 :         pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
    1621              :     }
    1622              : }
    1623              : 
    1624              : /*
    1625              :  * Finish processing the streaming transaction in the leader apply worker.
    1626              :  */
    1627              : void
    1628           27 : pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
    1629              : {
    1630              :     Assert(am_leader_apply_worker());
    1631              : 
    1632              :     /*
    1633              :      * Unlock the shared object lock so that parallel apply worker can
    1634              :      * continue to receive and apply changes.
    1635              :      */
    1636           27 :     pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
    1637              : 
    1638              :     /*
    1639              :      * Wait for that worker to finish. This is necessary to maintain commit
    1640              :      * order which avoids failures due to transaction dependencies and
    1641              :      * deadlocks.
    1642              :      */
    1643           27 :     pa_wait_for_xact_finish(winfo);
    1644              : 
    1645           25 :     if (XLogRecPtrIsValid(remote_lsn))
    1646           23 :         store_flush_position(remote_lsn, winfo->shared->last_commit_end);
    1647              : 
    1648           25 :     pa_free_worker(winfo);
    1649           25 : }
        

Generated by: LCOV version 2.0-1