LCOV - code coverage report
Current view: top level - src/backend/replication/logical - launcher.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 88.8 % 562 499
Test Date: 2026-04-06 14:16:21 Functions: 100.0 % 37 37
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  * launcher.c
       3              :  *     PostgreSQL logical replication worker launcher process
       4              :  *
       5              :  * Copyright (c) 2016-2026, PostgreSQL Global Development Group
       6              :  *
       7              :  * IDENTIFICATION
       8              :  *    src/backend/replication/logical/launcher.c
       9              :  *
      10              :  * NOTES
      11              :  *    This module contains the logical replication worker launcher which
      12              :  *    uses the background worker infrastructure to start the logical
      13              :  *    replication workers for every enabled subscription.
      14              :  *
      15              :  *-------------------------------------------------------------------------
      16              :  */
      17              : 
      18              : #include "postgres.h"
      19              : 
      20              : #include "access/heapam.h"
      21              : #include "access/htup.h"
      22              : #include "access/htup_details.h"
      23              : #include "access/tableam.h"
      24              : #include "access/xact.h"
      25              : #include "catalog/pg_subscription.h"
      26              : #include "catalog/pg_subscription_rel.h"
      27              : #include "funcapi.h"
      28              : #include "lib/dshash.h"
      29              : #include "miscadmin.h"
      30              : #include "pgstat.h"
      31              : #include "postmaster/bgworker.h"
      32              : #include "postmaster/interrupt.h"
      33              : #include "replication/logicallauncher.h"
      34              : #include "replication/origin.h"
      35              : #include "replication/slot.h"
      36              : #include "replication/walreceiver.h"
      37              : #include "replication/worker_internal.h"
      38              : #include "storage/ipc.h"
      39              : #include "storage/proc.h"
      40              : #include "storage/procarray.h"
      41              : #include "storage/subsystems.h"
      42              : #include "tcop/tcopprot.h"
      43              : #include "utils/builtins.h"
      44              : #include "utils/memutils.h"
      45              : #include "utils/pg_lsn.h"
      46              : #include "utils/snapmgr.h"
      47              : #include "utils/syscache.h"
      48              : #include "utils/wait_event.h"
      49              : 
      50              : /* max sleep time between cycles (3min) */
      51              : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
      52              : 
      53              : /* GUC variables */
      54              : int         max_logical_replication_workers = 4;
      55              : int         max_sync_workers_per_subscription = 2;
      56              : int         max_parallel_apply_workers_per_subscription = 2;
      57              : 
      58              : LogicalRepWorker *MyLogicalRepWorker = NULL;
      59              : 
      60              : typedef struct LogicalRepCtxStruct
      61              : {
      62              :     /* Supervisor process. */
      63              :     pid_t       launcher_pid;
      64              : 
      65              :     /* Hash table holding last start times of subscriptions' apply workers. */
      66              :     dsa_handle  last_start_dsa;
      67              :     dshash_table_handle last_start_dsh;
      68              : 
      69              :     /* Background workers. */
      70              :     LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
      71              : } LogicalRepCtxStruct;
      72              : 
      73              : static LogicalRepCtxStruct *LogicalRepCtx;
      74              : 
      75              : static void ApplyLauncherShmemRequest(void *arg);
      76              : static void ApplyLauncherShmemInit(void *arg);
      77              : 
      78              : const ShmemCallbacks ApplyLauncherShmemCallbacks = {
      79              :     .request_fn = ApplyLauncherShmemRequest,
      80              :     .init_fn = ApplyLauncherShmemInit,
      81              : };
      82              : 
      83              : /* an entry in the last-start-times shared hash table */
      84              : typedef struct LauncherLastStartTimesEntry
      85              : {
      86              :     Oid         subid;          /* OID of logrep subscription (hash key) */
      87              :     TimestampTz last_start_time;    /* last time its apply worker was started */
      88              : } LauncherLastStartTimesEntry;
      89              : 
      90              : /* parameters for the last-start-times shared hash table */
      91              : static const dshash_parameters dsh_params = {
      92              :     sizeof(Oid),
      93              :     sizeof(LauncherLastStartTimesEntry),
      94              :     dshash_memcmp,
      95              :     dshash_memhash,
      96              :     dshash_memcpy,
      97              :     LWTRANCHE_LAUNCHER_HASH
      98              : };
      99              : 
     100              : static dsa_area *last_start_times_dsa = NULL;
     101              : static dshash_table *last_start_times = NULL;
     102              : 
     103              : static bool on_commit_launcher_wakeup = false;
     104              : 
     105              : 
     106              : static void logicalrep_launcher_onexit(int code, Datum arg);
     107              : static void logicalrep_worker_onexit(int code, Datum arg);
     108              : static void logicalrep_worker_detach(void);
     109              : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
     110              : static int  logicalrep_pa_worker_count(Oid subid);
     111              : static void logicalrep_launcher_attach_dshmem(void);
     112              : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
     113              : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
     114              : static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
     115              : static bool acquire_conflict_slot_if_exists(void);
     116              : static void update_conflict_slot_xmin(TransactionId new_xmin);
     117              : static void init_conflict_slot_xmin(void);
     118              : 
     119              : 
     120              : /*
     121              :  * Load the list of subscriptions.
     122              :  *
     123              :  * Only the fields interesting for worker start/stop functions are filled for
     124              :  * each subscription.
     125              :  */
     126              : static List *
     127         3297 : get_subscription_list(void)
     128              : {
     129         3297 :     List       *res = NIL;
     130              :     Relation    rel;
     131              :     TableScanDesc scan;
     132              :     HeapTuple   tup;
     133              :     MemoryContext resultcxt;
     134              : 
     135              :     /* This is the context that we will allocate our output data in */
     136         3297 :     resultcxt = CurrentMemoryContext;
     137              : 
     138              :     /*
     139              :      * Start a transaction so we can access pg_subscription.
     140              :      */
     141         3297 :     StartTransactionCommand();
     142              : 
     143         3297 :     rel = table_open(SubscriptionRelationId, AccessShareLock);
     144         3297 :     scan = table_beginscan_catalog(rel, 0, NULL);
     145              : 
     146         4392 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     147              :     {
     148         1095 :         Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
     149              :         Subscription *sub;
     150              :         MemoryContext oldcxt;
     151              : 
     152              :         /*
     153              :          * Allocate our results in the caller's context, not the
     154              :          * transaction's. We do this inside the loop, and restore the original
     155              :          * context at the end, so that leaky things like heap_getnext() are
     156              :          * not called in a potentially long-lived context.
     157              :          */
     158         1095 :         oldcxt = MemoryContextSwitchTo(resultcxt);
     159              : 
     160         1095 :         sub = palloc0_object(Subscription);
     161         1095 :         sub->oid = subform->oid;
     162         1095 :         sub->dbid = subform->subdbid;
     163         1095 :         sub->owner = subform->subowner;
     164         1095 :         sub->enabled = subform->subenabled;
     165         1095 :         sub->name = pstrdup(NameStr(subform->subname));
     166         1095 :         sub->retaindeadtuples = subform->subretaindeadtuples;
     167         1095 :         sub->retentionactive = subform->subretentionactive;
     168              :         /* We don't fill fields we are not interested in. */
     169              : 
     170         1095 :         res = lappend(res, sub);
     171         1095 :         MemoryContextSwitchTo(oldcxt);
     172              :     }
     173              : 
     174         3296 :     table_endscan(scan);
     175         3296 :     table_close(rel, AccessShareLock);
     176              : 
     177         3296 :     CommitTransactionCommand();
     178              : 
     179         3296 :     return res;
     180              : }
     181              : 
     182              : /*
     183              :  * Wait for a background worker to start up and attach to the shmem context.
     184              :  *
     185              :  * This is only needed for cleaning up the shared memory in case the worker
     186              :  * fails to attach.
     187              :  *
     188              :  * Returns whether the attach was successful.
     189              :  */
     190              : static bool
     191          476 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
     192              :                                uint16 generation,
     193              :                                BackgroundWorkerHandle *handle)
     194              : {
     195          476 :     bool        result = false;
     196          476 :     bool        dropped_latch = false;
     197              : 
     198              :     for (;;)
     199         1136 :     {
     200              :         BgwHandleStatus status;
     201              :         pid_t       pid;
     202              :         int         rc;
     203              : 
     204         1612 :         CHECK_FOR_INTERRUPTS();
     205              : 
     206         1611 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     207              : 
     208              :         /* Worker either died or has started. Return false if died. */
     209         1611 :         if (!worker->in_use || worker->proc)
     210              :         {
     211          471 :             result = worker->in_use;
     212          471 :             LWLockRelease(LogicalRepWorkerLock);
     213          471 :             break;
     214              :         }
     215              : 
     216         1140 :         LWLockRelease(LogicalRepWorkerLock);
     217              : 
     218              :         /* Check if worker has died before attaching, and clean up after it. */
     219         1140 :         status = GetBackgroundWorkerPid(handle, &pid);
     220              : 
     221         1140 :         if (status == BGWH_STOPPED)
     222              :         {
     223            0 :             LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     224              :             /* Ensure that this was indeed the worker we waited for. */
     225            0 :             if (generation == worker->generation)
     226            0 :                 logicalrep_worker_cleanup(worker);
     227            0 :             LWLockRelease(LogicalRepWorkerLock);
     228            0 :             break;              /* result is already false */
     229              :         }
     230              : 
     231              :         /*
     232              :          * We need timeout because we generally don't get notified via latch
     233              :          * about the worker attach.  But we don't expect to have to wait long.
     234              :          */
     235         1140 :         rc = WaitLatch(MyLatch,
     236              :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     237              :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     238              : 
     239         1140 :         if (rc & WL_LATCH_SET)
     240              :         {
     241          551 :             ResetLatch(MyLatch);
     242          551 :             CHECK_FOR_INTERRUPTS();
     243          547 :             dropped_latch = true;
     244              :         }
     245              :     }
     246              : 
     247              :     /*
     248              :      * If we had to clear a latch event in order to wait, be sure to restore
     249              :      * it before exiting.  Otherwise caller may miss events.
     250              :      */
     251          471 :     if (dropped_latch)
     252          470 :         SetLatch(MyLatch);
     253              : 
     254          471 :     return result;
     255              : }
     256              : 
     257              : /*
     258              :  * Walks the workers array and searches for one that matches given worker type,
     259              :  * subscription id, and relation id.
     260              :  *
     261              :  * For both apply workers and sequencesync workers, the relid should be set to
     262              :  * InvalidOid, as these workers handle changes across all tables and sequences
     263              :  * respectively, rather than targeting a specific relation. For tablesync
     264              :  * workers, the relid should be set to the OID of the relation being
     265              :  * synchronized.
     266              :  */
     267              : LogicalRepWorker *
     268         3450 : logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid,
     269              :                        bool only_running)
     270              : {
     271              :     int         i;
     272         3450 :     LogicalRepWorker *res = NULL;
     273              : 
     274              :     /* relid must be valid only for table sync workers */
     275              :     Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
     276              :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     277              : 
     278              :     /* Search for an attached worker that matches the specified criteria. */
     279        10391 :     for (i = 0; i < max_logical_replication_workers; i++)
     280              :     {
     281         9073 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     282              : 
     283              :         /* Skip parallel apply workers. */
     284         9073 :         if (isParallelApplyWorker(w))
     285            0 :             continue;
     286              : 
     287         9073 :         if (w->in_use && w->subid == subid && w->relid == relid &&
     288         2160 :             w->type == wtype && (!only_running || w->proc))
     289              :         {
     290         2132 :             res = w;
     291         2132 :             break;
     292              :         }
     293              :     }
     294              : 
     295         3450 :     return res;
     296              : }
     297              : 
     298              : /*
     299              :  * Similar to logicalrep_worker_find(), but returns a list of all workers for
     300              :  * the subscription, instead of just one.
     301              :  */
     302              : List *
     303          816 : logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
     304              : {
     305              :     int         i;
     306          816 :     List       *res = NIL;
     307              : 
     308          816 :     if (acquire_lock)
     309          147 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     310              : 
     311              :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     312              : 
     313              :     /* Search for attached worker for a given subscription id. */
     314         4208 :     for (i = 0; i < max_logical_replication_workers; i++)
     315              :     {
     316         3392 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     317              : 
     318         3392 :         if (w->in_use && w->subid == subid && (!only_running || w->proc))
     319          559 :             res = lappend(res, w);
     320              :     }
     321              : 
     322          816 :     if (acquire_lock)
     323          147 :         LWLockRelease(LogicalRepWorkerLock);
     324              : 
     325          816 :     return res;
     326              : }
     327              : 
     328              : /*
     329              :  * Start new logical replication background worker, if possible.
     330              :  *
     331              :  * Returns true on success, false on failure.
     332              :  */
     333              : bool
     334          476 : logicalrep_worker_launch(LogicalRepWorkerType wtype,
     335              :                          Oid dbid, Oid subid, const char *subname, Oid userid,
     336              :                          Oid relid, dsm_handle subworker_dsm,
     337              :                          bool retain_dead_tuples)
     338              : {
     339              :     BackgroundWorker bgw;
     340              :     BackgroundWorkerHandle *bgw_handle;
     341              :     uint16      generation;
     342              :     int         i;
     343          476 :     int         slot = 0;
     344          476 :     LogicalRepWorker *worker = NULL;
     345              :     int         nsyncworkers;
     346              :     int         nparallelapplyworkers;
     347              :     TimestampTz now;
     348          476 :     bool        is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
     349          476 :     bool        is_sequencesync_worker = (wtype == WORKERTYPE_SEQUENCESYNC);
     350          476 :     bool        is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
     351              : 
     352              :     /*----------
     353              :      * Sanity checks:
     354              :      * - must be valid worker type
     355              :      * - tablesync workers are only ones to have relid
     356              :      * - parallel apply worker is the only kind of subworker
     357              :      * - The replication slot used in conflict detection is created when
     358              :      *   retain_dead_tuples is enabled
     359              :      */
     360              :     Assert(wtype != WORKERTYPE_UNKNOWN);
     361              :     Assert(is_tablesync_worker == OidIsValid(relid));
     362              :     Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
     363              :     Assert(!retain_dead_tuples || MyReplicationSlot);
     364              : 
     365          476 :     ereport(DEBUG1,
     366              :             (errmsg_internal("starting logical replication worker for subscription \"%s\"",
     367              :                              subname)));
     368              : 
     369              :     /* Report this after the initial starting message for consistency. */
     370          476 :     if (max_active_replication_origins == 0)
     371            0 :         ereport(ERROR,
     372              :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     373              :                  errmsg("cannot start logical replication workers when \"max_active_replication_origins\" is 0")));
     374              : 
     375              :     /*
     376              :      * We need to do the modification of the shared memory under lock so that
     377              :      * we have consistent view.
     378              :      */
     379          476 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     380              : 
     381          476 : retry:
     382              :     /* Find unused worker slot. */
     383          840 :     for (i = 0; i < max_logical_replication_workers; i++)
     384              :     {
     385          840 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     386              : 
     387          840 :         if (!w->in_use)
     388              :         {
     389          476 :             worker = w;
     390          476 :             slot = i;
     391          476 :             break;
     392              :         }
     393              :     }
     394              : 
     395          476 :     nsyncworkers = logicalrep_sync_worker_count(subid);
     396              : 
     397          476 :     now = GetCurrentTimestamp();
     398              : 
     399              :     /*
     400              :      * If we didn't find a free slot, try to do garbage collection.  The
     401              :      * reason we do this is because if some worker failed to start up and its
     402              :      * parent has crashed while waiting, the in_use state was never cleared.
     403              :      */
     404          476 :     if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
     405              :     {
     406            0 :         bool        did_cleanup = false;
     407              : 
     408            0 :         for (i = 0; i < max_logical_replication_workers; i++)
     409              :         {
     410            0 :             LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     411              : 
     412              :             /*
     413              :              * If the worker was marked in use but didn't manage to attach in
     414              :              * time, clean it up.
     415              :              */
     416            0 :             if (w->in_use && !w->proc &&
     417            0 :                 TimestampDifferenceExceeds(w->launch_time, now,
     418              :                                            wal_receiver_timeout))
     419              :             {
     420            0 :                 elog(WARNING,
     421              :                      "logical replication worker for subscription %u took too long to start; canceled",
     422              :                      w->subid);
     423              : 
     424            0 :                 logicalrep_worker_cleanup(w);
     425            0 :                 did_cleanup = true;
     426              :             }
     427              :         }
     428              : 
     429            0 :         if (did_cleanup)
     430            0 :             goto retry;
     431              :     }
     432              : 
     433              :     /*
     434              :      * We don't allow to invoke more sync workers once we have reached the
     435              :      * sync worker limit per subscription. So, just return silently as we
     436              :      * might get here because of an otherwise harmless race condition.
     437              :      */
     438          476 :     if ((is_tablesync_worker || is_sequencesync_worker) &&
     439          221 :         nsyncworkers >= max_sync_workers_per_subscription)
     440              :     {
     441            0 :         LWLockRelease(LogicalRepWorkerLock);
     442            0 :         return false;
     443              :     }
     444              : 
     445          476 :     nparallelapplyworkers = logicalrep_pa_worker_count(subid);
     446              : 
     447              :     /*
     448              :      * Return false if the number of parallel apply workers reached the limit
     449              :      * per subscription.
     450              :      */
     451          476 :     if (is_parallel_apply_worker &&
     452           13 :         nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
     453              :     {
     454            0 :         LWLockRelease(LogicalRepWorkerLock);
     455            0 :         return false;
     456              :     }
     457              : 
     458              :     /*
     459              :      * However if there are no more free worker slots, inform user about it
     460              :      * before exiting.
     461              :      */
     462          476 :     if (worker == NULL)
     463              :     {
     464            0 :         LWLockRelease(LogicalRepWorkerLock);
     465            0 :         ereport(WARNING,
     466              :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     467              :                  errmsg("out of logical replication worker slots"),
     468              :                  errhint("You might need to increase \"%s\".", "max_logical_replication_workers")));
     469            0 :         return false;
     470              :     }
     471              : 
     472              :     /* Prepare the worker slot. */
     473          476 :     worker->type = wtype;
     474          476 :     worker->launch_time = now;
     475          476 :     worker->in_use = true;
     476          476 :     worker->generation++;
     477          476 :     worker->proc = NULL;
     478          476 :     worker->dbid = dbid;
     479          476 :     worker->userid = userid;
     480          476 :     worker->subid = subid;
     481          476 :     worker->relid = relid;
     482          476 :     worker->relstate = SUBREL_STATE_UNKNOWN;
     483          476 :     worker->relstate_lsn = InvalidXLogRecPtr;
     484          476 :     worker->stream_fileset = NULL;
     485          476 :     worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
     486          476 :     worker->parallel_apply = is_parallel_apply_worker;
     487          476 :     worker->oldest_nonremovable_xid = retain_dead_tuples
     488            2 :         ? MyReplicationSlot->data.xmin
     489          476 :         : InvalidTransactionId;
     490          476 :     worker->last_lsn = InvalidXLogRecPtr;
     491          476 :     TIMESTAMP_NOBEGIN(worker->last_send_time);
     492          476 :     TIMESTAMP_NOBEGIN(worker->last_recv_time);
     493          476 :     worker->reply_lsn = InvalidXLogRecPtr;
     494          476 :     TIMESTAMP_NOBEGIN(worker->reply_time);
     495          476 :     worker->last_seqsync_start_time = 0;
     496              : 
     497              :     /* Before releasing lock, remember generation for future identification. */
     498          476 :     generation = worker->generation;
     499              : 
     500          476 :     LWLockRelease(LogicalRepWorkerLock);
     501              : 
     502              :     /* Register the new dynamic worker. */
     503          476 :     memset(&bgw, 0, sizeof(bgw));
     504          476 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
     505              :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     506          476 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
     507          476 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
     508              : 
     509          476 :     switch (worker->type)
     510              :     {
     511          242 :         case WORKERTYPE_APPLY:
     512          242 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
     513          242 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     514              :                      "logical replication apply worker for subscription %u",
     515              :                      subid);
     516          242 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
     517          242 :             break;
     518              : 
     519           13 :         case WORKERTYPE_PARALLEL_APPLY:
     520           13 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
     521           13 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     522              :                      "logical replication parallel apply worker for subscription %u",
     523              :                      subid);
     524           13 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
     525              : 
     526           13 :             memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
     527           13 :             break;
     528              : 
     529            9 :         case WORKERTYPE_SEQUENCESYNC:
     530            9 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "SequenceSyncWorkerMain");
     531            9 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     532              :                      "logical replication sequencesync worker for subscription %u",
     533              :                      subid);
     534            9 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication sequencesync worker");
     535            9 :             break;
     536              : 
     537          212 :         case WORKERTYPE_TABLESYNC:
     538          212 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TableSyncWorkerMain");
     539          212 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     540              :                      "logical replication tablesync worker for subscription %u sync %u",
     541              :                      subid,
     542              :                      relid);
     543          212 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
     544          212 :             break;
     545              : 
     546            0 :         case WORKERTYPE_UNKNOWN:
     547              :             /* Should never happen. */
     548            0 :             elog(ERROR, "unknown worker type");
     549              :     }
     550              : 
     551          476 :     bgw.bgw_restart_time = BGW_NEVER_RESTART;
     552          476 :     bgw.bgw_notify_pid = MyProcPid;
     553          476 :     bgw.bgw_main_arg = Int32GetDatum(slot);
     554              : 
     555          476 :     if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
     556              :     {
     557              :         /* Failed to start worker, so clean up the worker slot. */
     558            0 :         LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     559              :         Assert(generation == worker->generation);
     560            0 :         logicalrep_worker_cleanup(worker);
     561            0 :         LWLockRelease(LogicalRepWorkerLock);
     562              : 
     563            0 :         ereport(WARNING,
     564              :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     565              :                  errmsg("out of background worker slots"),
     566              :                  errhint("You might need to increase \"%s\".", "max_worker_processes")));
     567            0 :         return false;
     568              :     }
     569              : 
     570              :     /* Now wait until it attaches. */
     571          476 :     return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
     572              : }
     573              : 
     574              : /*
     575              :  * Internal function to stop the worker and wait until it detaches from the
     576              :  * slot.
     577              :  */
     578              : static void
     579           89 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
     580              : {
     581              :     uint16      generation;
     582              : 
     583              :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
     584              : 
     585              :     /*
     586              :      * Remember which generation was our worker so we can check if what we see
     587              :      * is still the same one.
     588              :      */
     589           89 :     generation = worker->generation;
     590              : 
     591              :     /*
     592              :      * If we found a worker but it does not have proc set then it is still
     593              :      * starting up; wait for it to finish starting and then kill it.
     594              :      */
     595           90 :     while (worker->in_use && !worker->proc)
     596              :     {
     597              :         int         rc;
     598              : 
     599            4 :         LWLockRelease(LogicalRepWorkerLock);
     600              : 
     601              :         /* Wait a bit --- we don't expect to have to wait long. */
     602            4 :         rc = WaitLatch(MyLatch,
     603              :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     604              :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     605              : 
     606            4 :         if (rc & WL_LATCH_SET)
     607              :         {
     608            0 :             ResetLatch(MyLatch);
     609            0 :             CHECK_FOR_INTERRUPTS();
     610              :         }
     611              : 
     612              :         /* Recheck worker status. */
     613            4 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     614              : 
     615              :         /*
     616              :          * Check whether the worker slot is no longer used, which would mean
     617              :          * that the worker has exited, or whether the worker generation is
     618              :          * different, meaning that a different worker has taken the slot.
     619              :          */
     620            4 :         if (!worker->in_use || worker->generation != generation)
     621            0 :             return;
     622              : 
     623              :         /* Worker has assigned proc, so it has started. */
     624            4 :         if (worker->proc)
     625            3 :             break;
     626              :     }
     627              : 
     628              :     /* Now terminate the worker ... */
     629           89 :     kill(worker->proc->pid, signo);
     630              : 
     631              :     /* ... and wait for it to die. */
     632              :     for (;;)
     633          107 :     {
     634              :         int         rc;
     635              : 
     636              :         /* is it gone? */
     637          196 :         if (!worker->proc || worker->generation != generation)
     638              :             break;
     639              : 
     640          107 :         LWLockRelease(LogicalRepWorkerLock);
     641              : 
     642              :         /* Wait a bit --- we don't expect to have to wait long. */
     643          107 :         rc = WaitLatch(MyLatch,
     644              :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     645              :                        10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
     646              : 
     647          107 :         if (rc & WL_LATCH_SET)
     648              :         {
     649           22 :             ResetLatch(MyLatch);
     650           22 :             CHECK_FOR_INTERRUPTS();
     651              :         }
     652              : 
     653          107 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     654              :     }
     655              : }
     656              : 
     657              : /*
     658              :  * Stop the logical replication worker that matches the specified worker type,
     659              :  * subscription id, and relation id.
     660              :  */
     661              : void
     662          102 : logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid)
     663              : {
     664              :     LogicalRepWorker *worker;
     665              : 
     666              :     /* relid must be valid only for table sync workers */
     667              :     Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
     668              : 
     669          102 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     670              : 
     671          102 :     worker = logicalrep_worker_find(wtype, subid, relid, false);
     672              : 
     673          102 :     if (worker)
     674              :     {
     675              :         Assert(!isParallelApplyWorker(worker));
     676           80 :         logicalrep_worker_stop_internal(worker, SIGTERM);
     677              :     }
     678              : 
     679          102 :     LWLockRelease(LogicalRepWorkerLock);
     680          102 : }
     681              : 
     682              : /*
     683              :  * Stop the given logical replication parallel apply worker.
     684              :  *
     685              :  * Node that the function sends SIGUSR2 instead of SIGTERM to the parallel apply
     686              :  * worker so that the worker exits cleanly.
     687              :  */
     688              : void
     689            5 : logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
     690              : {
     691              :     int         slot_no;
     692              :     uint16      generation;
     693              :     LogicalRepWorker *worker;
     694              : 
     695            5 :     SpinLockAcquire(&winfo->shared->mutex);
     696            5 :     generation = winfo->shared->logicalrep_worker_generation;
     697            5 :     slot_no = winfo->shared->logicalrep_worker_slot_no;
     698            5 :     SpinLockRelease(&winfo->shared->mutex);
     699              : 
     700              :     Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
     701              : 
     702              :     /*
     703              :      * Detach from the error_mq_handle for the parallel apply worker before
     704              :      * stopping it. This prevents the leader apply worker from trying to
     705              :      * receive the message from the error queue that might already be detached
     706              :      * by the parallel apply worker.
     707              :      */
     708            5 :     if (winfo->error_mq_handle)
     709              :     {
     710            5 :         shm_mq_detach(winfo->error_mq_handle);
     711            5 :         winfo->error_mq_handle = NULL;
     712              :     }
     713              : 
     714            5 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     715              : 
     716            5 :     worker = &LogicalRepCtx->workers[slot_no];
     717              :     Assert(isParallelApplyWorker(worker));
     718              : 
     719              :     /*
     720              :      * Only stop the worker if the generation matches and the worker is alive.
     721              :      */
     722            5 :     if (worker->generation == generation && worker->proc)
     723            5 :         logicalrep_worker_stop_internal(worker, SIGUSR2);
     724              : 
     725            5 :     LWLockRelease(LogicalRepWorkerLock);
     726            5 : }
     727              : 
     728              : /*
     729              :  * Wake up (using latch) any logical replication worker that matches the
     730              :  * specified worker type, subscription id, and relation id.
     731              :  */
     732              : void
     733          229 : logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid)
     734              : {
     735              :     LogicalRepWorker *worker;
     736              : 
     737              :     /* relid must be valid only for table sync workers */
     738              :     Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
     739              : 
     740          229 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     741              : 
     742          229 :     worker = logicalrep_worker_find(wtype, subid, relid, true);
     743              : 
     744          229 :     if (worker)
     745          229 :         logicalrep_worker_wakeup_ptr(worker);
     746              : 
     747          229 :     LWLockRelease(LogicalRepWorkerLock);
     748          229 : }
     749              : 
     750              : /*
     751              :  * Wake up (using latch) the specified logical replication worker.
     752              :  *
     753              :  * Caller must hold lock, else worker->proc could change under us.
     754              :  */
     755              : void
     756          701 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
     757              : {
     758              :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     759              : 
     760          701 :     SetLatch(&worker->proc->procLatch);
     761          701 : }
     762              : 
     763              : /*
     764              :  * Attach to a slot.
     765              :  */
     766              : void
     767          633 : logicalrep_worker_attach(int slot)
     768              : {
     769              :     /* Block concurrent access. */
     770          633 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     771              : 
     772              :     Assert(slot >= 0 && slot < max_logical_replication_workers);
     773          633 :     MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
     774              : 
     775          633 :     if (!MyLogicalRepWorker->in_use)
     776              :     {
     777            0 :         LWLockRelease(LogicalRepWorkerLock);
     778            0 :         ereport(ERROR,
     779              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     780              :                  errmsg("logical replication worker slot %d is empty, cannot attach",
     781              :                         slot)));
     782              :     }
     783              : 
     784          633 :     if (MyLogicalRepWorker->proc)
     785              :     {
     786            0 :         LWLockRelease(LogicalRepWorkerLock);
     787            0 :         ereport(ERROR,
     788              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     789              :                  errmsg("logical replication worker slot %d is already used by "
     790              :                         "another worker, cannot attach", slot)));
     791              :     }
     792              : 
     793          633 :     MyLogicalRepWorker->proc = MyProc;
     794          633 :     before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
     795              : 
     796          633 :     LWLockRelease(LogicalRepWorkerLock);
     797          633 : }
     798              : 
     799              : /*
     800              :  * Stop the parallel apply workers if any, and detach the leader apply worker
     801              :  * (cleans up the worker info).
     802              :  */
     803              : static void
     804          633 : logicalrep_worker_detach(void)
     805              : {
     806              :     /* Stop the parallel apply workers. */
     807          633 :     if (am_leader_apply_worker())
     808              :     {
     809              :         List       *workers;
     810              :         ListCell   *lc;
     811              : 
     812              :         /*
     813              :          * Detach from the error_mq_handle for all parallel apply workers
     814              :          * before terminating them. This prevents the leader apply worker from
     815              :          * receiving the worker termination message and sending it to logs
     816              :          * when the same is already done by the parallel worker.
     817              :          */
     818          397 :         pa_detach_all_error_mq();
     819              : 
     820          397 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     821              : 
     822          397 :         workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
     823          799 :         foreach(lc, workers)
     824              :         {
     825          402 :             LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
     826              : 
     827          402 :             if (isParallelApplyWorker(w))
     828            4 :                 logicalrep_worker_stop_internal(w, SIGTERM);
     829              :         }
     830              : 
     831          397 :         LWLockRelease(LogicalRepWorkerLock);
     832              : 
     833          397 :         list_free(workers);
     834              :     }
     835              : 
     836              :     /* Block concurrent access. */
     837          633 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     838              : 
     839          633 :     logicalrep_worker_cleanup(MyLogicalRepWorker);
     840              : 
     841          633 :     LWLockRelease(LogicalRepWorkerLock);
     842          633 : }
     843              : 
     844              : /*
     845              :  * Clean up worker info.
     846              :  */
     847              : static void
     848          633 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
     849              : {
     850              :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
     851              : 
     852          633 :     worker->type = WORKERTYPE_UNKNOWN;
     853          633 :     worker->in_use = false;
     854          633 :     worker->proc = NULL;
     855          633 :     worker->dbid = InvalidOid;
     856          633 :     worker->userid = InvalidOid;
     857          633 :     worker->subid = InvalidOid;
     858          633 :     worker->relid = InvalidOid;
     859          633 :     worker->leader_pid = InvalidPid;
     860          633 :     worker->parallel_apply = false;
     861          633 : }
     862              : 
     863              : /*
     864              :  * Cleanup function for logical replication launcher.
     865              :  *
     866              :  * Called on logical replication launcher exit.
     867              :  */
     868              : static void
     869          510 : logicalrep_launcher_onexit(int code, Datum arg)
     870              : {
     871          510 :     LogicalRepCtx->launcher_pid = 0;
     872          510 : }
     873              : 
     874              : /*
     875              :  * Reset the last_seqsync_start_time of the sequencesync worker in the
     876              :  * subscription's apply worker.
     877              :  *
     878              :  * Note that this value is not stored in the sequencesync worker, because that
     879              :  * has finished already and is about to exit.
     880              :  */
     881              : void
     882            5 : logicalrep_reset_seqsync_start_time(void)
     883              : {
     884              :     LogicalRepWorker *worker;
     885              : 
     886              :     /*
     887              :      * The apply worker can't access last_seqsync_start_time concurrently, so
     888              :      * it is okay to use SHARED lock here. See ProcessSequencesForSync().
     889              :      */
     890            5 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     891              : 
     892            5 :     worker = logicalrep_worker_find(WORKERTYPE_APPLY,
     893            5 :                                     MyLogicalRepWorker->subid, InvalidOid,
     894              :                                     true);
     895            5 :     if (worker)
     896            5 :         worker->last_seqsync_start_time = 0;
     897              : 
     898            5 :     LWLockRelease(LogicalRepWorkerLock);
     899            5 : }
     900              : 
     901              : /*
     902              :  * Cleanup function.
     903              :  *
     904              :  * Called on logical replication worker exit.
     905              :  */
     906              : static void
     907          633 : logicalrep_worker_onexit(int code, Datum arg)
     908              : {
     909              :     /* Disconnect gracefully from the remote side. */
     910          633 :     if (LogRepWorkerWalRcvConn)
     911          495 :         walrcv_disconnect(LogRepWorkerWalRcvConn);
     912              : 
     913          633 :     logicalrep_worker_detach();
     914              : 
     915              :     /* Cleanup fileset used for streaming transactions. */
     916          633 :     if (MyLogicalRepWorker->stream_fileset != NULL)
     917           14 :         FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
     918              : 
     919              :     /*
     920              :      * Session level locks may be acquired outside of a transaction in
     921              :      * parallel apply mode and will not be released when the worker
     922              :      * terminates, so manually release all locks before the worker exits.
     923              :      *
     924              :      * The locks will be acquired once the worker is initialized.
     925              :      */
     926          633 :     if (!InitializingApplyWorker)
     927          560 :         LockReleaseAll(DEFAULT_LOCKMETHOD, true);
     928              : 
     929          633 :     ApplyLauncherWakeup();
     930          633 : }
     931              : 
     932              : /*
     933              :  * Count the number of registered (not necessarily running) sync workers
     934              :  * for a subscription.
     935              :  */
     936              : int
     937         1365 : logicalrep_sync_worker_count(Oid subid)
     938              : {
     939              :     int         i;
     940         1365 :     int         res = 0;
     941              : 
     942              :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     943              : 
     944              :     /* Search for attached worker for a given subscription id. */
     945         7027 :     for (i = 0; i < max_logical_replication_workers; i++)
     946              :     {
     947         5662 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     948              : 
     949         5662 :         if (w->subid == subid && (isTableSyncWorker(w) || isSequenceSyncWorker(w)))
     950         1441 :             res++;
     951              :     }
     952              : 
     953         1365 :     return res;
     954              : }
     955              : 
     956              : /*
     957              :  * Count the number of registered (but not necessarily running) parallel apply
     958              :  * workers for a subscription.
     959              :  */
     960              : static int
     961          476 : logicalrep_pa_worker_count(Oid subid)
     962              : {
     963              :     int         i;
     964          476 :     int         res = 0;
     965              : 
     966              :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     967              : 
     968              :     /*
     969              :      * Scan all attached parallel apply workers, only counting those which
     970              :      * have the given subscription id.
     971              :      */
     972         2504 :     for (i = 0; i < max_logical_replication_workers; i++)
     973              :     {
     974         2028 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     975              : 
     976         2028 :         if (isParallelApplyWorker(w) && w->subid == subid)
     977            2 :             res++;
     978              :     }
     979              : 
     980          476 :     return res;
     981              : }
     982              : 
     983              : /*
     984              :  * ApplyLauncherShmemRequest
     985              :  *      Register shared memory space needed for replication launcher
     986              :  */
     987              : static void
     988         1233 : ApplyLauncherShmemRequest(void *arg)
     989              : {
     990              :     Size        size;
     991              : 
     992              :     /*
     993              :      * Need the fixed struct and the array of LogicalRepWorker.
     994              :      */
     995         1233 :     size = sizeof(LogicalRepCtxStruct);
     996         1233 :     size = MAXALIGN(size);
     997         1233 :     size = add_size(size, mul_size(max_logical_replication_workers,
     998              :                                    sizeof(LogicalRepWorker)));
     999         1233 :     ShmemRequestStruct(.name = "Logical Replication Launcher Data",
    1000              :                        .size = size,
    1001              :                        .ptr = (void **) &LogicalRepCtx,
    1002              :         );
    1003         1233 : }
    1004              : 
    1005              : /*
    1006              :  * ApplyLauncherRegister
    1007              :  *      Register a background worker running the logical replication launcher.
    1008              :  */
    1009              : void
    1010          993 : ApplyLauncherRegister(void)
    1011              : {
    1012              :     BackgroundWorker bgw;
    1013              : 
    1014              :     /*
    1015              :      * The logical replication launcher is disabled during binary upgrades, to
    1016              :      * prevent logical replication workers from running on the source cluster.
    1017              :      * That could cause replication origins to move forward after having been
    1018              :      * copied to the target cluster, potentially creating conflicts with the
    1019              :      * copied data files.
    1020              :      */
    1021          993 :     if (max_logical_replication_workers == 0 || IsBinaryUpgrade)
    1022           61 :         return;
    1023              : 
    1024          932 :     memset(&bgw, 0, sizeof(bgw));
    1025          932 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
    1026              :         BGWORKER_BACKEND_DATABASE_CONNECTION;
    1027          932 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
    1028          932 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
    1029          932 :     snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
    1030          932 :     snprintf(bgw.bgw_name, BGW_MAXLEN,
    1031              :              "logical replication launcher");
    1032          932 :     snprintf(bgw.bgw_type, BGW_MAXLEN,
    1033              :              "logical replication launcher");
    1034          932 :     bgw.bgw_restart_time = 5;
    1035          932 :     bgw.bgw_notify_pid = 0;
    1036          932 :     bgw.bgw_main_arg = (Datum) 0;
    1037              : 
    1038          932 :     RegisterBackgroundWorker(&bgw);
    1039              : }
    1040              : 
    1041              : /*
    1042              :  * ApplyLauncherShmemInit
    1043              :  *      Initialize replication launcher shared memory
    1044              :  */
    1045              : static void
    1046         1230 : ApplyLauncherShmemInit(void *arg)
    1047              : {
    1048              :     int         slot;
    1049              : 
    1050         1230 :     LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
    1051         1230 :     LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
    1052              : 
    1053              :     /* Initialize memory and spin locks for each worker slot. */
    1054         6113 :     for (slot = 0; slot < max_logical_replication_workers; slot++)
    1055              :     {
    1056         4883 :         LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
    1057              : 
    1058         4883 :         memset(worker, 0, sizeof(LogicalRepWorker));
    1059         4883 :         SpinLockInit(&worker->relmutex);
    1060              :     }
    1061         1230 : }
    1062              : 
    1063              : /*
    1064              :  * Initialize or attach to the dynamic shared hash table that stores the
    1065              :  * last-start times, if not already done.
    1066              :  * This must be called before accessing the table.
    1067              :  */
    1068              : static void
    1069          917 : logicalrep_launcher_attach_dshmem(void)
    1070              : {
    1071              :     MemoryContext oldcontext;
    1072              : 
    1073              :     /* Quick exit if we already did this. */
    1074          917 :     if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
    1075          857 :         last_start_times != NULL)
    1076          649 :         return;
    1077              : 
    1078              :     /* Otherwise, use a lock to ensure only one process creates the table. */
    1079          268 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
    1080              : 
    1081              :     /* Be sure any local memory allocated by DSA routines is persistent. */
    1082          268 :     oldcontext = MemoryContextSwitchTo(TopMemoryContext);
    1083              : 
    1084          268 :     if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
    1085              :     {
    1086              :         /* Initialize dynamic shared hash table for last-start times. */
    1087           60 :         last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
    1088           60 :         dsa_pin(last_start_times_dsa);
    1089           60 :         dsa_pin_mapping(last_start_times_dsa);
    1090           60 :         last_start_times = dshash_create(last_start_times_dsa, &dsh_params, NULL);
    1091              : 
    1092              :         /* Store handles in shared memory for other backends to use. */
    1093           60 :         LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
    1094           60 :         LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
    1095              :     }
    1096          208 :     else if (!last_start_times)
    1097              :     {
    1098              :         /* Attach to existing dynamic shared hash table. */
    1099          208 :         last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
    1100          208 :         dsa_pin_mapping(last_start_times_dsa);
    1101          208 :         last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
    1102          208 :                                          LogicalRepCtx->last_start_dsh, NULL);
    1103              :     }
    1104              : 
    1105          268 :     MemoryContextSwitchTo(oldcontext);
    1106          268 :     LWLockRelease(LogicalRepWorkerLock);
    1107              : }
    1108              : 
    1109              : /*
    1110              :  * Set the last-start time for the subscription.
    1111              :  */
    1112              : static void
    1113          242 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
    1114              : {
    1115              :     LauncherLastStartTimesEntry *entry;
    1116              :     bool        found;
    1117              : 
    1118          242 :     logicalrep_launcher_attach_dshmem();
    1119              : 
    1120          242 :     entry = dshash_find_or_insert(last_start_times, &subid, &found);
    1121          242 :     entry->last_start_time = start_time;
    1122          242 :     dshash_release_lock(last_start_times, entry);
    1123          242 : }
    1124              : 
    1125              : /*
    1126              :  * Return the last-start time for the subscription, or 0 if there isn't one.
    1127              :  */
    1128              : static TimestampTz
    1129          407 : ApplyLauncherGetWorkerStartTime(Oid subid)
    1130              : {
    1131              :     LauncherLastStartTimesEntry *entry;
    1132              :     TimestampTz ret;
    1133              : 
    1134          407 :     logicalrep_launcher_attach_dshmem();
    1135              : 
    1136          407 :     entry = dshash_find(last_start_times, &subid, false);
    1137          407 :     if (entry == NULL)
    1138          134 :         return 0;
    1139              : 
    1140          273 :     ret = entry->last_start_time;
    1141          273 :     dshash_release_lock(last_start_times, entry);
    1142              : 
    1143          273 :     return ret;
    1144              : }
    1145              : 
    1146              : /*
    1147              :  * Remove the last-start-time entry for the subscription, if one exists.
    1148              :  *
    1149              :  * This has two use-cases: to remove the entry related to a subscription
    1150              :  * that's been deleted or disabled (just to avoid leaking shared memory),
    1151              :  * and to allow immediate restart of an apply worker that has exited
    1152              :  * due to subscription parameter changes.
    1153              :  */
    1154              : void
    1155          268 : ApplyLauncherForgetWorkerStartTime(Oid subid)
    1156              : {
    1157          268 :     logicalrep_launcher_attach_dshmem();
    1158              : 
    1159          268 :     (void) dshash_delete_key(last_start_times, &subid);
    1160          268 : }
    1161              : 
    1162              : /*
    1163              :  * Wakeup the launcher on commit if requested.
    1164              :  */
    1165              : void
    1166       626962 : AtEOXact_ApplyLauncher(bool isCommit)
    1167              : {
    1168       626962 :     if (isCommit)
    1169              :     {
    1170       591329 :         if (on_commit_launcher_wakeup)
    1171          154 :             ApplyLauncherWakeup();
    1172              :     }
    1173              : 
    1174       626962 :     on_commit_launcher_wakeup = false;
    1175       626962 : }
    1176              : 
    1177              : /*
    1178              :  * Request wakeup of the launcher on commit of the transaction.
    1179              :  *
    1180              :  * This is used to send launcher signal to stop sleeping and process the
    1181              :  * subscriptions when current transaction commits. Should be used when new
    1182              :  * tuple was added to the pg_subscription catalog.
    1183              : */
    1184              : void
    1185          155 : ApplyLauncherWakeupAtCommit(void)
    1186              : {
    1187          155 :     if (!on_commit_launcher_wakeup)
    1188          154 :         on_commit_launcher_wakeup = true;
    1189          155 : }
    1190              : 
    1191              : /*
    1192              :  * Wakeup the launcher immediately.
    1193              :  */
    1194              : void
    1195          837 : ApplyLauncherWakeup(void)
    1196              : {
    1197          837 :     if (LogicalRepCtx->launcher_pid != 0)
    1198          819 :         kill(LogicalRepCtx->launcher_pid, SIGUSR1);
    1199          837 : }
    1200              : 
    1201              : /*
    1202              :  * Main loop for the apply launcher process.
    1203              :  */
    1204              : void
    1205          510 : ApplyLauncherMain(Datum main_arg)
    1206              : {
    1207          510 :     ereport(DEBUG1,
    1208              :             (errmsg_internal("logical replication launcher started")));
    1209              : 
    1210          510 :     before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
    1211              : 
    1212              :     Assert(LogicalRepCtx->launcher_pid == 0);
    1213          510 :     LogicalRepCtx->launcher_pid = MyProcPid;
    1214              : 
    1215              :     /* Establish signal handlers. */
    1216          510 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
    1217          510 :     BackgroundWorkerUnblockSignals();
    1218              : 
    1219              :     /*
    1220              :      * Establish connection to nailed catalogs (we only ever access
    1221              :      * pg_subscription).
    1222              :      */
    1223          510 :     BackgroundWorkerInitializeConnection(NULL, NULL, 0);
    1224              : 
    1225              :     /*
    1226              :      * Acquire the conflict detection slot at startup to ensure it can be
    1227              :      * dropped if no longer needed after a restart.
    1228              :      */
    1229          510 :     acquire_conflict_slot_if_exists();
    1230              : 
    1231              :     /* Enter main loop */
    1232              :     for (;;)
    1233         2788 :     {
    1234              :         int         rc;
    1235              :         List       *sublist;
    1236              :         ListCell   *lc;
    1237              :         MemoryContext subctx;
    1238              :         MemoryContext oldctx;
    1239         3298 :         long        wait_time = DEFAULT_NAPTIME_PER_CYCLE;
    1240         3298 :         bool        can_update_xmin = true;
    1241         3298 :         bool        retain_dead_tuples = false;
    1242         3298 :         TransactionId xmin = InvalidTransactionId;
    1243              : 
    1244         3298 :         CHECK_FOR_INTERRUPTS();
    1245              : 
    1246              :         /* Use temporary context to avoid leaking memory across cycles. */
    1247         3297 :         subctx = AllocSetContextCreate(TopMemoryContext,
    1248              :                                        "Logical Replication Launcher sublist",
    1249              :                                        ALLOCSET_DEFAULT_SIZES);
    1250         3297 :         oldctx = MemoryContextSwitchTo(subctx);
    1251              : 
    1252              :         /*
    1253              :          * Start any missing workers for enabled subscriptions.
    1254              :          *
    1255              :          * Also, during the iteration through all subscriptions, we compute
    1256              :          * the minimum XID required to protect deleted tuples for conflict
    1257              :          * detection if one of the subscription enables retain_dead_tuples
    1258              :          * option.
    1259              :          */
    1260         3297 :         sublist = get_subscription_list();
    1261         4387 :         foreach(lc, sublist)
    1262              :         {
    1263         1095 :             Subscription *sub = (Subscription *) lfirst(lc);
    1264              :             LogicalRepWorker *w;
    1265              :             TimestampTz last_start;
    1266              :             TimestampTz now;
    1267              :             long        elapsed;
    1268              : 
    1269         1095 :             if (sub->retaindeadtuples)
    1270              :             {
    1271          127 :                 retain_dead_tuples = true;
    1272              : 
    1273              :                 /*
    1274              :                  * Create a replication slot to retain information necessary
    1275              :                  * for conflict detection such as dead tuples, commit
    1276              :                  * timestamps, and origins.
    1277              :                  *
    1278              :                  * The slot is created before starting the apply worker to
    1279              :                  * prevent it from unnecessarily maintaining its
    1280              :                  * oldest_nonremovable_xid.
    1281              :                  *
    1282              :                  * The slot is created even for a disabled subscription to
    1283              :                  * ensure that conflict-related information is available when
    1284              :                  * applying remote changes that occurred before the
    1285              :                  * subscription was enabled.
    1286              :                  */
    1287          127 :                 CreateConflictDetectionSlot();
    1288              : 
    1289          127 :                 if (sub->retentionactive)
    1290              :                 {
    1291              :                     /*
    1292              :                      * Can't advance xmin of the slot unless all the
    1293              :                      * subscriptions actively retaining dead tuples are
    1294              :                      * enabled. This is required to ensure that we don't
    1295              :                      * advance the xmin of CONFLICT_DETECTION_SLOT if one of
    1296              :                      * the subscriptions is not enabled. Otherwise, we won't
    1297              :                      * be able to detect conflicts reliably for such a
    1298              :                      * subscription even though it has set the
    1299              :                      * retain_dead_tuples option.
    1300              :                      */
    1301          127 :                     can_update_xmin &= sub->enabled;
    1302              : 
    1303              :                     /*
    1304              :                      * Initialize the slot once the subscription activates
    1305              :                      * retention.
    1306              :                      */
    1307          127 :                     if (!TransactionIdIsValid(MyReplicationSlot->data.xmin))
    1308            0 :                         init_conflict_slot_xmin();
    1309              :                 }
    1310              :             }
    1311              : 
    1312         1095 :             if (!sub->enabled)
    1313           43 :                 continue;
    1314              : 
    1315         1052 :             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1316         1052 :             w = logicalrep_worker_find(WORKERTYPE_APPLY, sub->oid, InvalidOid,
    1317              :                                        false);
    1318              : 
    1319         1052 :             if (w != NULL)
    1320              :             {
    1321              :                 /*
    1322              :                  * Compute the minimum xmin required to protect dead tuples
    1323              :                  * required for conflict detection among all running apply
    1324              :                  * workers. This computation is performed while holding
    1325              :                  * LogicalRepWorkerLock to prevent accessing invalid worker
    1326              :                  * data, in scenarios where a worker might exit and reset its
    1327              :                  * state concurrently.
    1328              :                  */
    1329          645 :                 if (sub->retaindeadtuples &&
    1330          123 :                     sub->retentionactive &&
    1331              :                     can_update_xmin)
    1332          123 :                     compute_min_nonremovable_xid(w, &xmin);
    1333              : 
    1334          645 :                 LWLockRelease(LogicalRepWorkerLock);
    1335              : 
    1336              :                 /* worker is running already */
    1337          645 :                 continue;
    1338              :             }
    1339              : 
    1340          407 :             LWLockRelease(LogicalRepWorkerLock);
    1341              : 
    1342              :             /*
    1343              :              * Can't advance xmin of the slot unless all the workers
    1344              :              * corresponding to subscriptions actively retaining dead tuples
    1345              :              * are running, disabling the further computation of the minimum
    1346              :              * nonremovable xid.
    1347              :              */
    1348          407 :             if (sub->retaindeadtuples && sub->retentionactive)
    1349            2 :                 can_update_xmin = false;
    1350              : 
    1351              :             /*
    1352              :              * If the worker is eligible to start now, launch it.  Otherwise,
    1353              :              * adjust wait_time so that we'll wake up as soon as it can be
    1354              :              * started.
    1355              :              *
    1356              :              * Each subscription's apply worker can only be restarted once per
    1357              :              * wal_retrieve_retry_interval, so that errors do not cause us to
    1358              :              * repeatedly restart the worker as fast as possible.  In cases
    1359              :              * where a restart is expected (e.g., subscription parameter
    1360              :              * changes), another process should remove the last-start entry
    1361              :              * for the subscription so that the worker can be restarted
    1362              :              * without waiting for wal_retrieve_retry_interval to elapse.
    1363              :              */
    1364          407 :             last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
    1365          407 :             now = GetCurrentTimestamp();
    1366          407 :             if (last_start == 0 ||
    1367          273 :                 (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
    1368              :             {
    1369          242 :                 ApplyLauncherSetWorkerStartTime(sub->oid, now);
    1370          245 :                 if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
    1371          242 :                                               sub->dbid, sub->oid, sub->name,
    1372              :                                               sub->owner, InvalidOid,
    1373              :                                               DSM_HANDLE_INVALID,
    1374          244 :                                               sub->retaindeadtuples &&
    1375          244 :                                               sub->retentionactive))
    1376              :                 {
    1377              :                     /*
    1378              :                      * We get here either if we failed to launch a worker
    1379              :                      * (perhaps for resource-exhaustion reasons) or if we
    1380              :                      * launched one but it immediately quit.  Either way, it
    1381              :                      * seems appropriate to try again after
    1382              :                      * wal_retrieve_retry_interval.
    1383              :                      */
    1384            7 :                     wait_time = Min(wait_time,
    1385              :                                     wal_retrieve_retry_interval);
    1386              :                 }
    1387              :             }
    1388              :             else
    1389              :             {
    1390          165 :                 wait_time = Min(wait_time,
    1391              :                                 wal_retrieve_retry_interval - elapsed);
    1392              :             }
    1393              :         }
    1394              : 
    1395              :         /*
    1396              :          * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription
    1397              :          * that requires us to retain dead tuples. Otherwise, if required,
    1398              :          * advance the slot's xmin to protect dead tuples required for the
    1399              :          * conflict detection.
    1400              :          *
    1401              :          * Additionally, if all apply workers for subscriptions with
    1402              :          * retain_dead_tuples enabled have requested to stop retention, the
    1403              :          * slot's xmin will be set to InvalidTransactionId allowing the
    1404              :          * removal of dead tuples.
    1405              :          */
    1406         3292 :         if (MyReplicationSlot)
    1407              :         {
    1408          128 :             if (!retain_dead_tuples)
    1409            1 :                 ReplicationSlotDropAcquired();
    1410          127 :             else if (can_update_xmin)
    1411          123 :                 update_conflict_slot_xmin(xmin);
    1412              :         }
    1413              : 
    1414              :         /* Switch back to original memory context. */
    1415         3292 :         MemoryContextSwitchTo(oldctx);
    1416              :         /* Clean the temporary memory. */
    1417         3292 :         MemoryContextDelete(subctx);
    1418              : 
    1419              :         /* Wait for more work. */
    1420         3292 :         rc = WaitLatch(MyLatch,
    1421              :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1422              :                        wait_time,
    1423              :                        WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
    1424              : 
    1425         3289 :         if (rc & WL_LATCH_SET)
    1426              :         {
    1427         3242 :             ResetLatch(MyLatch);
    1428         3242 :             CHECK_FOR_INTERRUPTS();
    1429              :         }
    1430              : 
    1431         2788 :         if (ConfigReloadPending)
    1432              :         {
    1433           50 :             ConfigReloadPending = false;
    1434           50 :             ProcessConfigFile(PGC_SIGHUP);
    1435              :         }
    1436              :     }
    1437              : 
    1438              :     /* Not reachable */
    1439              : }
    1440              : 
    1441              : /*
    1442              :  * Determine the minimum non-removable transaction ID across all apply workers
    1443              :  * for subscriptions that have retain_dead_tuples enabled. Store the result
    1444              :  * in *xmin.
    1445              :  */
    1446              : static void
    1447          123 : compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
    1448              : {
    1449              :     TransactionId nonremovable_xid;
    1450              : 
    1451              :     Assert(worker != NULL);
    1452              : 
    1453              :     /*
    1454              :      * The replication slot for conflict detection must be created before the
    1455              :      * worker starts.
    1456              :      */
    1457              :     Assert(MyReplicationSlot);
    1458              : 
    1459          123 :     SpinLockAcquire(&worker->relmutex);
    1460          123 :     nonremovable_xid = worker->oldest_nonremovable_xid;
    1461          123 :     SpinLockRelease(&worker->relmutex);
    1462              : 
    1463              :     /*
    1464              :      * Return if the apply worker has stopped retention concurrently.
    1465              :      *
    1466              :      * Although this function is invoked only when retentionactive is true,
    1467              :      * the apply worker might stop retention after the launcher fetches the
    1468              :      * retentionactive flag.
    1469              :      */
    1470          123 :     if (!TransactionIdIsValid(nonremovable_xid))
    1471            0 :         return;
    1472              : 
    1473          123 :     if (!TransactionIdIsValid(*xmin) ||
    1474            0 :         TransactionIdPrecedes(nonremovable_xid, *xmin))
    1475          123 :         *xmin = nonremovable_xid;
    1476              : }
    1477              : 
    1478              : /*
    1479              :  * Acquire the replication slot used to retain information for conflict
    1480              :  * detection, if it exists.
    1481              :  *
    1482              :  * Return true if successfully acquired, otherwise return false.
    1483              :  */
    1484              : static bool
    1485          510 : acquire_conflict_slot_if_exists(void)
    1486              : {
    1487          510 :     if (!SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true))
    1488          509 :         return false;
    1489              : 
    1490            1 :     ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false);
    1491            1 :     return true;
    1492              : }
    1493              : 
    1494              : /*
    1495              :  * Update the xmin the replication slot used to retain information required
    1496              :  * for conflict detection.
    1497              :  */
    1498              : static void
    1499          123 : update_conflict_slot_xmin(TransactionId new_xmin)
    1500              : {
    1501              :     Assert(MyReplicationSlot);
    1502              :     Assert(!TransactionIdIsValid(new_xmin) ||
    1503              :            TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
    1504              : 
    1505              :     /* Return if the xmin value of the slot cannot be updated */
    1506          123 :     if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin))
    1507          100 :         return;
    1508              : 
    1509           23 :     SpinLockAcquire(&MyReplicationSlot->mutex);
    1510           23 :     MyReplicationSlot->effective_xmin = new_xmin;
    1511           23 :     MyReplicationSlot->data.xmin = new_xmin;
    1512           23 :     SpinLockRelease(&MyReplicationSlot->mutex);
    1513              : 
    1514           23 :     elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin);
    1515              : 
    1516           23 :     ReplicationSlotMarkDirty();
    1517           23 :     ReplicationSlotsComputeRequiredXmin(false);
    1518              : 
    1519              :     /*
    1520              :      * Like PhysicalConfirmReceivedLocation(), do not save slot information
    1521              :      * each time. This is acceptable because all concurrent transactions on
    1522              :      * the publisher that require the data preceding the slot's xmin should
    1523              :      * have already been applied and flushed on the subscriber before the xmin
    1524              :      * is advanced. So, even if the slot's xmin regresses after a restart, it
    1525              :      * will be advanced again in the next cycle. Therefore, no data required
    1526              :      * for conflict detection will be prematurely removed.
    1527              :      */
    1528           23 :     return;
    1529              : }
    1530              : 
    1531              : /*
    1532              :  * Initialize the xmin for the conflict detection slot.
    1533              :  */
    1534              : static void
    1535            4 : init_conflict_slot_xmin(void)
    1536              : {
    1537              :     TransactionId xmin_horizon;
    1538              : 
    1539              :     /* Replication slot must exist but shouldn't be initialized. */
    1540              :     Assert(MyReplicationSlot &&
    1541              :            !TransactionIdIsValid(MyReplicationSlot->data.xmin));
    1542              : 
    1543            4 :     LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
    1544            4 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
    1545              : 
    1546            4 :     xmin_horizon = GetOldestSafeDecodingTransactionId(false);
    1547              : 
    1548            4 :     SpinLockAcquire(&MyReplicationSlot->mutex);
    1549            4 :     MyReplicationSlot->effective_xmin = xmin_horizon;
    1550            4 :     MyReplicationSlot->data.xmin = xmin_horizon;
    1551            4 :     SpinLockRelease(&MyReplicationSlot->mutex);
    1552              : 
    1553            4 :     ReplicationSlotsComputeRequiredXmin(true);
    1554              : 
    1555            4 :     LWLockRelease(ProcArrayLock);
    1556            4 :     LWLockRelease(ReplicationSlotControlLock);
    1557              : 
    1558              :     /* Write this slot to disk */
    1559            4 :     ReplicationSlotMarkDirty();
    1560            4 :     ReplicationSlotSave();
    1561            4 : }
    1562              : 
    1563              : /*
    1564              :  * Create and acquire the replication slot used to retain information for
    1565              :  * conflict detection, if not yet.
    1566              :  */
    1567              : void
    1568          128 : CreateConflictDetectionSlot(void)
    1569              : {
    1570              :     /* Exit early, if the replication slot is already created and acquired */
    1571          128 :     if (MyReplicationSlot)
    1572          124 :         return;
    1573              : 
    1574            4 :     ereport(LOG,
    1575              :             errmsg("creating replication conflict detection slot"));
    1576              : 
    1577            4 :     ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
    1578              :                           false, false);
    1579              : 
    1580            4 :     init_conflict_slot_xmin();
    1581              : }
    1582              : 
    1583              : /*
    1584              :  * Is current process the logical replication launcher?
    1585              :  */
    1586              : bool
    1587         2750 : IsLogicalLauncher(void)
    1588              : {
    1589         2750 :     return LogicalRepCtx->launcher_pid == MyProcPid;
    1590              : }
    1591              : 
    1592              : /*
    1593              :  * Return the pid of the leader apply worker if the given pid is the pid of a
    1594              :  * parallel apply worker, otherwise, return InvalidPid.
    1595              :  */
    1596              : pid_t
    1597          939 : GetLeaderApplyWorkerPid(pid_t pid)
    1598              : {
    1599          939 :     int         leader_pid = InvalidPid;
    1600              :     int         i;
    1601              : 
    1602          939 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1603              : 
    1604         4695 :     for (i = 0; i < max_logical_replication_workers; i++)
    1605              :     {
    1606         3756 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
    1607              : 
    1608         3756 :         if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
    1609              :         {
    1610            0 :             leader_pid = w->leader_pid;
    1611            0 :             break;
    1612              :         }
    1613              :     }
    1614              : 
    1615          939 :     LWLockRelease(LogicalRepWorkerLock);
    1616              : 
    1617          939 :     return leader_pid;
    1618              : }
    1619              : 
    1620              : /*
    1621              :  * Returns state of the subscriptions.
    1622              :  */
    1623              : Datum
    1624            1 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
    1625              : {
    1626              : #define PG_STAT_GET_SUBSCRIPTION_COLS   10
    1627            1 :     Oid         subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
    1628              :     int         i;
    1629            1 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1630              : 
    1631            1 :     InitMaterializedSRF(fcinfo, 0);
    1632              : 
    1633              :     /* Make sure we get consistent view of the workers. */
    1634            1 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1635              : 
    1636            5 :     for (i = 0; i < max_logical_replication_workers; i++)
    1637              :     {
    1638              :         /* for each row */
    1639            4 :         Datum       values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
    1640            4 :         bool        nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
    1641              :         int         worker_pid;
    1642              :         LogicalRepWorker worker;
    1643              : 
    1644            4 :         memcpy(&worker, &LogicalRepCtx->workers[i],
    1645              :                sizeof(LogicalRepWorker));
    1646            4 :         if (!worker.proc || !IsBackendPid(worker.proc->pid))
    1647            2 :             continue;
    1648              : 
    1649            2 :         if (OidIsValid(subid) && worker.subid != subid)
    1650            0 :             continue;
    1651              : 
    1652            2 :         worker_pid = worker.proc->pid;
    1653              : 
    1654            2 :         values[0] = ObjectIdGetDatum(worker.subid);
    1655            2 :         if (isTableSyncWorker(&worker))
    1656            0 :             values[1] = ObjectIdGetDatum(worker.relid);
    1657              :         else
    1658            2 :             nulls[1] = true;
    1659            2 :         values[2] = Int32GetDatum(worker_pid);
    1660              : 
    1661            2 :         if (isParallelApplyWorker(&worker))
    1662            0 :             values[3] = Int32GetDatum(worker.leader_pid);
    1663              :         else
    1664            2 :             nulls[3] = true;
    1665              : 
    1666            2 :         if (!XLogRecPtrIsValid(worker.last_lsn))
    1667            0 :             nulls[4] = true;
    1668              :         else
    1669            2 :             values[4] = LSNGetDatum(worker.last_lsn);
    1670            2 :         if (worker.last_send_time == 0)
    1671            0 :             nulls[5] = true;
    1672              :         else
    1673            2 :             values[5] = TimestampTzGetDatum(worker.last_send_time);
    1674            2 :         if (worker.last_recv_time == 0)
    1675            0 :             nulls[6] = true;
    1676              :         else
    1677            2 :             values[6] = TimestampTzGetDatum(worker.last_recv_time);
    1678            2 :         if (!XLogRecPtrIsValid(worker.reply_lsn))
    1679            0 :             nulls[7] = true;
    1680              :         else
    1681            2 :             values[7] = LSNGetDatum(worker.reply_lsn);
    1682            2 :         if (worker.reply_time == 0)
    1683            0 :             nulls[8] = true;
    1684              :         else
    1685            2 :             values[8] = TimestampTzGetDatum(worker.reply_time);
    1686              : 
    1687            2 :         switch (worker.type)
    1688              :         {
    1689            2 :             case WORKERTYPE_APPLY:
    1690            2 :                 values[9] = CStringGetTextDatum("apply");
    1691            2 :                 break;
    1692            0 :             case WORKERTYPE_PARALLEL_APPLY:
    1693            0 :                 values[9] = CStringGetTextDatum("parallel apply");
    1694            0 :                 break;
    1695            0 :             case WORKERTYPE_SEQUENCESYNC:
    1696            0 :                 values[9] = CStringGetTextDatum("sequence synchronization");
    1697            0 :                 break;
    1698            0 :             case WORKERTYPE_TABLESYNC:
    1699            0 :                 values[9] = CStringGetTextDatum("table synchronization");
    1700            0 :                 break;
    1701            0 :             case WORKERTYPE_UNKNOWN:
    1702              :                 /* Should never happen. */
    1703            0 :                 elog(ERROR, "unknown worker type");
    1704              :         }
    1705              : 
    1706            2 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
    1707              :                              values, nulls);
    1708              : 
    1709              :         /*
    1710              :          * If only a single subscription was requested, and we found it,
    1711              :          * break.
    1712              :          */
    1713            2 :         if (OidIsValid(subid))
    1714            0 :             break;
    1715              :     }
    1716              : 
    1717            1 :     LWLockRelease(LogicalRepWorkerLock);
    1718              : 
    1719            1 :     return (Datum) 0;
    1720              : }
        

Generated by: LCOV version 2.0-1