LCOV - code coverage report
Current view: top level - src/backend/replication/logical - launcher.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 505 564 89.5 %
Date: 2025-11-16 08:18:22 Functions: 37 37 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * launcher.c
       3             :  *     PostgreSQL logical replication worker launcher process
       4             :  *
       5             :  * Copyright (c) 2016-2025, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/launcher.c
       9             :  *
      10             :  * NOTES
      11             :  *    This module contains the logical replication worker launcher which
      12             :  *    uses the background worker infrastructure to start the logical
      13             :  *    replication workers for every enabled subscription.
      14             :  *
      15             :  *-------------------------------------------------------------------------
      16             :  */
      17             : 
      18             : #include "postgres.h"
      19             : 
      20             : #include "access/heapam.h"
      21             : #include "access/htup.h"
      22             : #include "access/htup_details.h"
      23             : #include "access/tableam.h"
      24             : #include "access/xact.h"
      25             : #include "catalog/pg_subscription.h"
      26             : #include "catalog/pg_subscription_rel.h"
      27             : #include "funcapi.h"
      28             : #include "lib/dshash.h"
      29             : #include "miscadmin.h"
      30             : #include "pgstat.h"
      31             : #include "postmaster/bgworker.h"
      32             : #include "postmaster/interrupt.h"
      33             : #include "replication/logicallauncher.h"
      34             : #include "replication/origin.h"
      35             : #include "replication/slot.h"
      36             : #include "replication/walreceiver.h"
      37             : #include "replication/worker_internal.h"
      38             : #include "storage/ipc.h"
      39             : #include "storage/proc.h"
      40             : #include "storage/procarray.h"
      41             : #include "tcop/tcopprot.h"
      42             : #include "utils/builtins.h"
      43             : #include "utils/memutils.h"
      44             : #include "utils/pg_lsn.h"
      45             : #include "utils/snapmgr.h"
      46             : #include "utils/syscache.h"
      47             : 
      48             : /* max sleep time between cycles (3min) */
      49             : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
      50             : 
      51             : /* GUC variables */
      52             : int         max_logical_replication_workers = 4;
      53             : int         max_sync_workers_per_subscription = 2;
      54             : int         max_parallel_apply_workers_per_subscription = 2;
      55             : 
      56             : LogicalRepWorker *MyLogicalRepWorker = NULL;
      57             : 
      58             : typedef struct LogicalRepCtxStruct
      59             : {
      60             :     /* Supervisor process. */
      61             :     pid_t       launcher_pid;
      62             : 
      63             :     /* Hash table holding last start times of subscriptions' apply workers. */
      64             :     dsa_handle  last_start_dsa;
      65             :     dshash_table_handle last_start_dsh;
      66             : 
      67             :     /* Background workers. */
      68             :     LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
      69             : } LogicalRepCtxStruct;
      70             : 
      71             : static LogicalRepCtxStruct *LogicalRepCtx;
      72             : 
      73             : /* an entry in the last-start-times shared hash table */
      74             : typedef struct LauncherLastStartTimesEntry
      75             : {
      76             :     Oid         subid;          /* OID of logrep subscription (hash key) */
      77             :     TimestampTz last_start_time;    /* last time its apply worker was started */
      78             : } LauncherLastStartTimesEntry;
      79             : 
      80             : /* parameters for the last-start-times shared hash table */
      81             : static const dshash_parameters dsh_params = {
      82             :     sizeof(Oid),
      83             :     sizeof(LauncherLastStartTimesEntry),
      84             :     dshash_memcmp,
      85             :     dshash_memhash,
      86             :     dshash_memcpy,
      87             :     LWTRANCHE_LAUNCHER_HASH
      88             : };
      89             : 
      90             : static dsa_area *last_start_times_dsa = NULL;
      91             : static dshash_table *last_start_times = NULL;
      92             : 
      93             : static bool on_commit_launcher_wakeup = false;
      94             : 
      95             : 
      96             : static void logicalrep_launcher_onexit(int code, Datum arg);
      97             : static void logicalrep_worker_onexit(int code, Datum arg);
      98             : static void logicalrep_worker_detach(void);
      99             : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
     100             : static int  logicalrep_pa_worker_count(Oid subid);
     101             : static void logicalrep_launcher_attach_dshmem(void);
     102             : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
     103             : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
     104             : static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
     105             : static bool acquire_conflict_slot_if_exists(void);
     106             : static void update_conflict_slot_xmin(TransactionId new_xmin);
     107             : static void init_conflict_slot_xmin(void);
     108             : 
     109             : 
     110             : /*
     111             :  * Load the list of subscriptions.
     112             :  *
     113             :  * Only the fields interesting for worker start/stop functions are filled for
     114             :  * each subscription.
     115             :  */
     116             : static List *
     117        5764 : get_subscription_list(void)
     118             : {
     119        5764 :     List       *res = NIL;
     120             :     Relation    rel;
     121             :     TableScanDesc scan;
     122             :     HeapTuple   tup;
     123             :     MemoryContext resultcxt;
     124             : 
     125             :     /* This is the context that we will allocate our output data in */
     126        5764 :     resultcxt = CurrentMemoryContext;
     127             : 
     128             :     /*
     129             :      * Start a transaction so we can access pg_subscription.
     130             :      */
     131        5764 :     StartTransactionCommand();
     132             : 
     133        5764 :     rel = table_open(SubscriptionRelationId, AccessShareLock);
     134        5764 :     scan = table_beginscan_catalog(rel, 0, NULL);
     135             : 
     136        7512 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     137             :     {
     138        1748 :         Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
     139             :         Subscription *sub;
     140             :         MemoryContext oldcxt;
     141             : 
     142             :         /*
     143             :          * Allocate our results in the caller's context, not the
     144             :          * transaction's. We do this inside the loop, and restore the original
     145             :          * context at the end, so that leaky things like heap_getnext() are
     146             :          * not called in a potentially long-lived context.
     147             :          */
     148        1748 :         oldcxt = MemoryContextSwitchTo(resultcxt);
     149             : 
     150        1748 :         sub = (Subscription *) palloc0(sizeof(Subscription));
     151        1748 :         sub->oid = subform->oid;
     152        1748 :         sub->dbid = subform->subdbid;
     153        1748 :         sub->owner = subform->subowner;
     154        1748 :         sub->enabled = subform->subenabled;
     155        1748 :         sub->name = pstrdup(NameStr(subform->subname));
     156        1748 :         sub->retaindeadtuples = subform->subretaindeadtuples;
     157        1748 :         sub->retentionactive = subform->subretentionactive;
     158             :         /* We don't fill fields we are not interested in. */
     159             : 
     160        1748 :         res = lappend(res, sub);
     161        1748 :         MemoryContextSwitchTo(oldcxt);
     162             :     }
     163             : 
     164        5764 :     table_endscan(scan);
     165        5764 :     table_close(rel, AccessShareLock);
     166             : 
     167        5764 :     CommitTransactionCommand();
     168             : 
     169        5764 :     return res;
     170             : }
     171             : 
     172             : /*
     173             :  * Wait for a background worker to start up and attach to the shmem context.
     174             :  *
     175             :  * This is only needed for cleaning up the shared memory in case the worker
     176             :  * fails to attach.
     177             :  *
     178             :  * Returns whether the attach was successful.
     179             :  */
     180             : static bool
     181         838 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
     182             :                                uint16 generation,
     183             :                                BackgroundWorkerHandle *handle)
     184             : {
     185         838 :     bool        result = false;
     186         838 :     bool        dropped_latch = false;
     187             : 
     188             :     for (;;)
     189        3042 :     {
     190             :         BgwHandleStatus status;
     191             :         pid_t       pid;
     192             :         int         rc;
     193             : 
     194        3880 :         CHECK_FOR_INTERRUPTS();
     195             : 
     196        3880 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     197             : 
     198             :         /* Worker either died or has started. Return false if died. */
     199        3880 :         if (!worker->in_use || worker->proc)
     200             :         {
     201         838 :             result = worker->in_use;
     202         838 :             LWLockRelease(LogicalRepWorkerLock);
     203         838 :             break;
     204             :         }
     205             : 
     206        3042 :         LWLockRelease(LogicalRepWorkerLock);
     207             : 
     208             :         /* Check if worker has died before attaching, and clean up after it. */
     209        3042 :         status = GetBackgroundWorkerPid(handle, &pid);
     210             : 
     211        3042 :         if (status == BGWH_STOPPED)
     212             :         {
     213           0 :             LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     214             :             /* Ensure that this was indeed the worker we waited for. */
     215           0 :             if (generation == worker->generation)
     216           0 :                 logicalrep_worker_cleanup(worker);
     217           0 :             LWLockRelease(LogicalRepWorkerLock);
     218           0 :             break;              /* result is already false */
     219             :         }
     220             : 
     221             :         /*
     222             :          * We need timeout because we generally don't get notified via latch
     223             :          * about the worker attach.  But we don't expect to have to wait long.
     224             :          */
     225        3042 :         rc = WaitLatch(MyLatch,
     226             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     227             :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     228             : 
     229        3042 :         if (rc & WL_LATCH_SET)
     230             :         {
     231        1010 :             ResetLatch(MyLatch);
     232        1010 :             CHECK_FOR_INTERRUPTS();
     233        1010 :             dropped_latch = true;
     234             :         }
     235             :     }
     236             : 
     237             :     /*
     238             :      * If we had to clear a latch event in order to wait, be sure to restore
     239             :      * it before exiting.  Otherwise caller may miss events.
     240             :      */
     241         838 :     if (dropped_latch)
     242         838 :         SetLatch(MyLatch);
     243             : 
     244         838 :     return result;
     245             : }
     246             : 
     247             : /*
     248             :  * Walks the workers array and searches for one that matches given worker type,
     249             :  * subscription id, and relation id.
     250             :  *
     251             :  * For both apply workers and sequencesync workers, the relid should be set to
     252             :  * InvalidOid, as these workers handle changes across all tables and sequences
     253             :  * respectively, rather than targeting a specific relation. For tablesync
     254             :  * workers, the relid should be set to the OID of the relation being
     255             :  * synchronized.
     256             :  */
     257             : LogicalRepWorker *
     258        6034 : logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid,
     259             :                        bool only_running)
     260             : {
     261             :     int         i;
     262        6034 :     LogicalRepWorker *res = NULL;
     263             : 
     264             :     /* relid must be valid only for table sync workers */
     265             :     Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
     266             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     267             : 
     268             :     /* Search for an attached worker that matches the specified criteria. */
     269       18208 :     for (i = 0; i < max_logical_replication_workers; i++)
     270             :     {
     271       15926 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     272             : 
     273             :         /* Skip parallel apply workers. */
     274       15926 :         if (isParallelApplyWorker(w))
     275           0 :             continue;
     276             : 
     277       15926 :         if (w->in_use && w->subid == subid && w->relid == relid &&
     278        3808 :             w->type == wtype && (!only_running || w->proc))
     279             :         {
     280        3752 :             res = w;
     281        3752 :             break;
     282             :         }
     283             :     }
     284             : 
     285        6034 :     return res;
     286             : }
     287             : 
     288             : /*
     289             :  * Similar to logicalrep_worker_find(), but returns a list of all workers for
     290             :  * the subscription, instead of just one.
     291             :  */
     292             : List *
     293        1310 : logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
     294             : {
     295             :     int         i;
     296        1310 :     List       *res = NIL;
     297             : 
     298        1310 :     if (acquire_lock)
     299         238 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     300             : 
     301             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     302             : 
     303             :     /* Search for attached worker for a given subscription id. */
     304        6786 :     for (i = 0; i < max_logical_replication_workers; i++)
     305             :     {
     306        5476 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     307             : 
     308        5476 :         if (w->in_use && w->subid == subid && (!only_running || w->proc))
     309         936 :             res = lappend(res, w);
     310             :     }
     311             : 
     312        1310 :     if (acquire_lock)
     313         238 :         LWLockRelease(LogicalRepWorkerLock);
     314             : 
     315        1310 :     return res;
     316             : }
     317             : 
     318             : /*
     319             :  * Start new logical replication background worker, if possible.
     320             :  *
     321             :  * Returns true on success, false on failure.
     322             :  */
     323             : bool
     324         838 : logicalrep_worker_launch(LogicalRepWorkerType wtype,
     325             :                          Oid dbid, Oid subid, const char *subname, Oid userid,
     326             :                          Oid relid, dsm_handle subworker_dsm,
     327             :                          bool retain_dead_tuples)
     328             : {
     329             :     BackgroundWorker bgw;
     330             :     BackgroundWorkerHandle *bgw_handle;
     331             :     uint16      generation;
     332             :     int         i;
     333         838 :     int         slot = 0;
     334         838 :     LogicalRepWorker *worker = NULL;
     335             :     int         nsyncworkers;
     336             :     int         nparallelapplyworkers;
     337             :     TimestampTz now;
     338         838 :     bool        is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
     339         838 :     bool        is_sequencesync_worker = (wtype == WORKERTYPE_SEQUENCESYNC);
     340         838 :     bool        is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
     341             : 
     342             :     /*----------
     343             :      * Sanity checks:
     344             :      * - must be valid worker type
     345             :      * - tablesync workers are only ones to have relid
     346             :      * - parallel apply worker is the only kind of subworker
     347             :      * - The replication slot used in conflict detection is created when
     348             :      *   retain_dead_tuples is enabled
     349             :      */
     350             :     Assert(wtype != WORKERTYPE_UNKNOWN);
     351             :     Assert(is_tablesync_worker == OidIsValid(relid));
     352             :     Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
     353             :     Assert(!retain_dead_tuples || MyReplicationSlot);
     354             : 
     355         838 :     ereport(DEBUG1,
     356             :             (errmsg_internal("starting logical replication worker for subscription \"%s\"",
     357             :                              subname)));
     358             : 
     359             :     /* Report this after the initial starting message for consistency. */
     360         838 :     if (max_active_replication_origins == 0)
     361           0 :         ereport(ERROR,
     362             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     363             :                  errmsg("cannot start logical replication workers when \"max_active_replication_origins\" is 0")));
     364             : 
     365             :     /*
     366             :      * We need to do the modification of the shared memory under lock so that
     367             :      * we have consistent view.
     368             :      */
     369         838 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     370             : 
     371         838 : retry:
     372             :     /* Find unused worker slot. */
     373        1466 :     for (i = 0; i < max_logical_replication_workers; i++)
     374             :     {
     375        1466 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     376             : 
     377        1466 :         if (!w->in_use)
     378             :         {
     379         838 :             worker = w;
     380         838 :             slot = i;
     381         838 :             break;
     382             :         }
     383             :     }
     384             : 
     385         838 :     nsyncworkers = logicalrep_sync_worker_count(subid);
     386             : 
     387         838 :     now = GetCurrentTimestamp();
     388             : 
     389             :     /*
     390             :      * If we didn't find a free slot, try to do garbage collection.  The
     391             :      * reason we do this is because if some worker failed to start up and its
     392             :      * parent has crashed while waiting, the in_use state was never cleared.
     393             :      */
     394         838 :     if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
     395             :     {
     396           0 :         bool        did_cleanup = false;
     397             : 
     398           0 :         for (i = 0; i < max_logical_replication_workers; i++)
     399             :         {
     400           0 :             LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     401             : 
     402             :             /*
     403             :              * If the worker was marked in use but didn't manage to attach in
     404             :              * time, clean it up.
     405             :              */
     406           0 :             if (w->in_use && !w->proc &&
     407           0 :                 TimestampDifferenceExceeds(w->launch_time, now,
     408             :                                            wal_receiver_timeout))
     409             :             {
     410           0 :                 elog(WARNING,
     411             :                      "logical replication worker for subscription %u took too long to start; canceled",
     412             :                      w->subid);
     413             : 
     414           0 :                 logicalrep_worker_cleanup(w);
     415           0 :                 did_cleanup = true;
     416             :             }
     417             :         }
     418             : 
     419           0 :         if (did_cleanup)
     420           0 :             goto retry;
     421             :     }
     422             : 
     423             :     /*
     424             :      * We don't allow to invoke more sync workers once we have reached the
     425             :      * sync worker limit per subscription. So, just return silently as we
     426             :      * might get here because of an otherwise harmless race condition.
     427             :      */
     428         838 :     if ((is_tablesync_worker || is_sequencesync_worker) &&
     429         410 :         nsyncworkers >= max_sync_workers_per_subscription)
     430             :     {
     431           0 :         LWLockRelease(LogicalRepWorkerLock);
     432           0 :         return false;
     433             :     }
     434             : 
     435         838 :     nparallelapplyworkers = logicalrep_pa_worker_count(subid);
     436             : 
     437             :     /*
     438             :      * Return false if the number of parallel apply workers reached the limit
     439             :      * per subscription.
     440             :      */
     441         838 :     if (is_parallel_apply_worker &&
     442          20 :         nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
     443             :     {
     444           0 :         LWLockRelease(LogicalRepWorkerLock);
     445           0 :         return false;
     446             :     }
     447             : 
     448             :     /*
     449             :      * However if there are no more free worker slots, inform user about it
     450             :      * before exiting.
     451             :      */
     452         838 :     if (worker == NULL)
     453             :     {
     454           0 :         LWLockRelease(LogicalRepWorkerLock);
     455           0 :         ereport(WARNING,
     456             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     457             :                  errmsg("out of logical replication worker slots"),
     458             :                  errhint("You might need to increase \"%s\".", "max_logical_replication_workers")));
     459           0 :         return false;
     460             :     }
     461             : 
     462             :     /* Prepare the worker slot. */
     463         838 :     worker->type = wtype;
     464         838 :     worker->launch_time = now;
     465         838 :     worker->in_use = true;
     466         838 :     worker->generation++;
     467         838 :     worker->proc = NULL;
     468         838 :     worker->dbid = dbid;
     469         838 :     worker->userid = userid;
     470         838 :     worker->subid = subid;
     471         838 :     worker->relid = relid;
     472         838 :     worker->relstate = SUBREL_STATE_UNKNOWN;
     473         838 :     worker->relstate_lsn = InvalidXLogRecPtr;
     474         838 :     worker->stream_fileset = NULL;
     475         838 :     worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
     476         838 :     worker->parallel_apply = is_parallel_apply_worker;
     477         838 :     worker->oldest_nonremovable_xid = retain_dead_tuples
     478           4 :         ? MyReplicationSlot->data.xmin
     479         838 :         : InvalidTransactionId;
     480         838 :     worker->last_lsn = InvalidXLogRecPtr;
     481         838 :     TIMESTAMP_NOBEGIN(worker->last_send_time);
     482         838 :     TIMESTAMP_NOBEGIN(worker->last_recv_time);
     483         838 :     worker->reply_lsn = InvalidXLogRecPtr;
     484         838 :     TIMESTAMP_NOBEGIN(worker->reply_time);
     485         838 :     worker->last_seqsync_start_time = 0;
     486             : 
     487             :     /* Before releasing lock, remember generation for future identification. */
     488         838 :     generation = worker->generation;
     489             : 
     490         838 :     LWLockRelease(LogicalRepWorkerLock);
     491             : 
     492             :     /* Register the new dynamic worker. */
     493         838 :     memset(&bgw, 0, sizeof(bgw));
     494         838 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
     495             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     496         838 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
     497         838 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
     498             : 
     499         838 :     switch (worker->type)
     500             :     {
     501         408 :         case WORKERTYPE_APPLY:
     502         408 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
     503         408 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     504             :                      "logical replication apply worker for subscription %u",
     505             :                      subid);
     506         408 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
     507         408 :             break;
     508             : 
     509          20 :         case WORKERTYPE_PARALLEL_APPLY:
     510          20 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
     511          20 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     512             :                      "logical replication parallel apply worker for subscription %u",
     513             :                      subid);
     514          20 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
     515             : 
     516          20 :             memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
     517          20 :             break;
     518             : 
     519          18 :         case WORKERTYPE_SEQUENCESYNC:
     520          18 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "SequenceSyncWorkerMain");
     521          18 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     522             :                      "logical replication sequencesync worker for subscription %u",
     523             :                      subid);
     524          18 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication sequencesync worker");
     525          18 :             break;
     526             : 
     527         392 :         case WORKERTYPE_TABLESYNC:
     528         392 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TableSyncWorkerMain");
     529         392 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     530             :                      "logical replication tablesync worker for subscription %u sync %u",
     531             :                      subid,
     532             :                      relid);
     533         392 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
     534         392 :             break;
     535             : 
     536           0 :         case WORKERTYPE_UNKNOWN:
     537             :             /* Should never happen. */
     538           0 :             elog(ERROR, "unknown worker type");
     539             :     }
     540             : 
     541         838 :     bgw.bgw_restart_time = BGW_NEVER_RESTART;
     542         838 :     bgw.bgw_notify_pid = MyProcPid;
     543         838 :     bgw.bgw_main_arg = Int32GetDatum(slot);
     544             : 
     545         838 :     if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
     546             :     {
     547             :         /* Failed to start worker, so clean up the worker slot. */
     548           0 :         LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     549             :         Assert(generation == worker->generation);
     550           0 :         logicalrep_worker_cleanup(worker);
     551           0 :         LWLockRelease(LogicalRepWorkerLock);
     552             : 
     553           0 :         ereport(WARNING,
     554             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     555             :                  errmsg("out of background worker slots"),
     556             :                  errhint("You might need to increase \"%s\".", "max_worker_processes")));
     557           0 :         return false;
     558             :     }
     559             : 
     560             :     /* Now wait until it attaches. */
     561         838 :     return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
     562             : }
     563             : 
     564             : /*
     565             :  * Internal function to stop the worker and wait until it detaches from the
     566             :  * slot.
     567             :  */
     568             : static void
     569         164 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
     570             : {
     571             :     uint16      generation;
     572             : 
     573             :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
     574             : 
     575             :     /*
     576             :      * Remember which generation was our worker so we can check if what we see
     577             :      * is still the same one.
     578             :      */
     579         164 :     generation = worker->generation;
     580             : 
     581             :     /*
     582             :      * If we found a worker but it does not have proc set then it is still
     583             :      * starting up; wait for it to finish starting and then kill it.
     584             :      */
     585         166 :     while (worker->in_use && !worker->proc)
     586             :     {
     587             :         int         rc;
     588             : 
     589           8 :         LWLockRelease(LogicalRepWorkerLock);
     590             : 
     591             :         /* Wait a bit --- we don't expect to have to wait long. */
     592           8 :         rc = WaitLatch(MyLatch,
     593             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     594             :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     595             : 
     596           8 :         if (rc & WL_LATCH_SET)
     597             :         {
     598           2 :             ResetLatch(MyLatch);
     599           2 :             CHECK_FOR_INTERRUPTS();
     600             :         }
     601             : 
     602             :         /* Recheck worker status. */
     603           8 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     604             : 
     605             :         /*
     606             :          * Check whether the worker slot is no longer used, which would mean
     607             :          * that the worker has exited, or whether the worker generation is
     608             :          * different, meaning that a different worker has taken the slot.
     609             :          */
     610           8 :         if (!worker->in_use || worker->generation != generation)
     611           0 :             return;
     612             : 
     613             :         /* Worker has assigned proc, so it has started. */
     614           8 :         if (worker->proc)
     615           6 :             break;
     616             :     }
     617             : 
     618             :     /* Now terminate the worker ... */
     619         164 :     kill(worker->proc->pid, signo);
     620             : 
     621             :     /* ... and wait for it to die. */
     622             :     for (;;)
     623         242 :     {
     624             :         int         rc;
     625             : 
     626             :         /* is it gone? */
     627         406 :         if (!worker->proc || worker->generation != generation)
     628             :             break;
     629             : 
     630         242 :         LWLockRelease(LogicalRepWorkerLock);
     631             : 
     632             :         /* Wait a bit --- we don't expect to have to wait long. */
     633         242 :         rc = WaitLatch(MyLatch,
     634             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     635             :                        10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
     636             : 
     637         242 :         if (rc & WL_LATCH_SET)
     638             :         {
     639         114 :             ResetLatch(MyLatch);
     640         114 :             CHECK_FOR_INTERRUPTS();
     641             :         }
     642             : 
     643         242 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     644             :     }
     645             : }
     646             : 
     647             : /*
     648             :  * Stop the logical replication worker that matches the specified worker type,
     649             :  * subscription id, and relation id.
     650             :  */
     651             : void
     652         190 : logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid)
     653             : {
     654             :     LogicalRepWorker *worker;
     655             : 
     656             :     /* relid must be valid only for table sync workers */
     657             :     Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
     658             : 
     659         190 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     660             : 
     661         190 :     worker = logicalrep_worker_find(wtype, subid, relid, false);
     662             : 
     663         190 :     if (worker)
     664             :     {
     665             :         Assert(!isParallelApplyWorker(worker));
     666         148 :         logicalrep_worker_stop_internal(worker, SIGTERM);
     667             :     }
     668             : 
     669         190 :     LWLockRelease(LogicalRepWorkerLock);
     670         190 : }
     671             : 
     672             : /*
     673             :  * Stop the given logical replication parallel apply worker.
     674             :  *
     675             :  * Node that the function sends SIGUSR2 instead of SIGTERM to the parallel apply
     676             :  * worker so that the worker exits cleanly.
     677             :  */
     678             : void
     679          10 : logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
     680             : {
     681             :     int         slot_no;
     682             :     uint16      generation;
     683             :     LogicalRepWorker *worker;
     684             : 
     685          10 :     SpinLockAcquire(&winfo->shared->mutex);
     686          10 :     generation = winfo->shared->logicalrep_worker_generation;
     687          10 :     slot_no = winfo->shared->logicalrep_worker_slot_no;
     688          10 :     SpinLockRelease(&winfo->shared->mutex);
     689             : 
     690             :     Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
     691             : 
     692             :     /*
     693             :      * Detach from the error_mq_handle for the parallel apply worker before
     694             :      * stopping it. This prevents the leader apply worker from trying to
     695             :      * receive the message from the error queue that might already be detached
     696             :      * by the parallel apply worker.
     697             :      */
     698          10 :     if (winfo->error_mq_handle)
     699             :     {
     700          10 :         shm_mq_detach(winfo->error_mq_handle);
     701          10 :         winfo->error_mq_handle = NULL;
     702             :     }
     703             : 
     704          10 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     705             : 
     706          10 :     worker = &LogicalRepCtx->workers[slot_no];
     707             :     Assert(isParallelApplyWorker(worker));
     708             : 
     709             :     /*
     710             :      * Only stop the worker if the generation matches and the worker is alive.
     711             :      */
     712          10 :     if (worker->generation == generation && worker->proc)
     713          10 :         logicalrep_worker_stop_internal(worker, SIGUSR2);
     714             : 
     715          10 :     LWLockRelease(LogicalRepWorkerLock);
     716          10 : }
     717             : 
     718             : /*
     719             :  * Wake up (using latch) any logical replication worker that matches the
     720             :  * specified worker type, subscription id, and relation id.
     721             :  */
     722             : void
     723         422 : logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid)
     724             : {
     725             :     LogicalRepWorker *worker;
     726             : 
     727             :     /* relid must be valid only for table sync workers */
     728             :     Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
     729             : 
     730         422 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     731             : 
     732         422 :     worker = logicalrep_worker_find(wtype, subid, relid, true);
     733             : 
     734         422 :     if (worker)
     735         422 :         logicalrep_worker_wakeup_ptr(worker);
     736             : 
     737         422 :     LWLockRelease(LogicalRepWorkerLock);
     738         422 : }
     739             : 
     740             : /*
     741             :  * Wake up (using latch) the specified logical replication worker.
     742             :  *
     743             :  * Caller must hold lock, else worker->proc could change under us.
     744             :  */
     745             : void
     746        1300 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
     747             : {
     748             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     749             : 
     750        1300 :     SetLatch(&worker->proc->procLatch);
     751        1300 : }
     752             : 
     753             : /*
     754             :  * Attach to a slot.
     755             :  */
     756             : void
     757        1068 : logicalrep_worker_attach(int slot)
     758             : {
     759             :     /* Block concurrent access. */
     760        1068 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     761             : 
     762             :     Assert(slot >= 0 && slot < max_logical_replication_workers);
     763        1068 :     MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
     764             : 
     765        1068 :     if (!MyLogicalRepWorker->in_use)
     766             :     {
     767           0 :         LWLockRelease(LogicalRepWorkerLock);
     768           0 :         ereport(ERROR,
     769             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     770             :                  errmsg("logical replication worker slot %d is empty, cannot attach",
     771             :                         slot)));
     772             :     }
     773             : 
     774        1068 :     if (MyLogicalRepWorker->proc)
     775             :     {
     776           0 :         LWLockRelease(LogicalRepWorkerLock);
     777           0 :         ereport(ERROR,
     778             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     779             :                  errmsg("logical replication worker slot %d is already used by "
     780             :                         "another worker, cannot attach", slot)));
     781             :     }
     782             : 
     783        1068 :     MyLogicalRepWorker->proc = MyProc;
     784        1068 :     before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
     785             : 
     786        1068 :     LWLockRelease(LogicalRepWorkerLock);
     787        1068 : }
     788             : 
     789             : /*
     790             :  * Stop the parallel apply workers if any, and detach the leader apply worker
     791             :  * (cleans up the worker info).
     792             :  */
     793             : static void
     794        1068 : logicalrep_worker_detach(void)
     795             : {
     796             :     /* Stop the parallel apply workers. */
     797        1068 :     if (am_leader_apply_worker())
     798             :     {
     799             :         List       *workers;
     800             :         ListCell   *lc;
     801             : 
     802             :         /*
     803             :          * Detach from the error_mq_handle for all parallel apply workers
     804             :          * before terminating them. This prevents the leader apply worker from
     805             :          * receiving the worker termination message and sending it to logs
     806             :          * when the same is already done by the parallel worker.
     807             :          */
     808         634 :         pa_detach_all_error_mq();
     809             : 
     810         634 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     811             : 
     812         634 :         workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
     813        1278 :         foreach(lc, workers)
     814             :         {
     815         644 :             LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
     816             : 
     817         644 :             if (isParallelApplyWorker(w))
     818           6 :                 logicalrep_worker_stop_internal(w, SIGTERM);
     819             :         }
     820             : 
     821         634 :         LWLockRelease(LogicalRepWorkerLock);
     822             : 
     823         634 :         list_free(workers);
     824             :     }
     825             : 
     826             :     /* Block concurrent access. */
     827        1068 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     828             : 
     829        1068 :     logicalrep_worker_cleanup(MyLogicalRepWorker);
     830             : 
     831        1068 :     LWLockRelease(LogicalRepWorkerLock);
     832        1068 : }
     833             : 
     834             : /*
     835             :  * Clean up worker info.
     836             :  */
     837             : static void
     838        1068 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
     839             : {
     840             :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
     841             : 
     842        1068 :     worker->type = WORKERTYPE_UNKNOWN;
     843        1068 :     worker->in_use = false;
     844        1068 :     worker->proc = NULL;
     845        1068 :     worker->dbid = InvalidOid;
     846        1068 :     worker->userid = InvalidOid;
     847        1068 :     worker->subid = InvalidOid;
     848        1068 :     worker->relid = InvalidOid;
     849        1068 :     worker->leader_pid = InvalidPid;
     850        1068 :     worker->parallel_apply = false;
     851        1068 : }
     852             : 
     853             : /*
     854             :  * Cleanup function for logical replication launcher.
     855             :  *
     856             :  * Called on logical replication launcher exit.
     857             :  */
     858             : static void
     859         860 : logicalrep_launcher_onexit(int code, Datum arg)
     860             : {
     861         860 :     LogicalRepCtx->launcher_pid = 0;
     862         860 : }
     863             : 
     864             : /*
     865             :  * Reset the last_seqsync_start_time of the sequencesync worker in the
     866             :  * subscription's apply worker.
     867             :  *
     868             :  * Note that this value is not stored in the sequencesync worker, because that
     869             :  * has finished already and is about to exit.
     870             :  */
     871             : void
     872          10 : logicalrep_reset_seqsync_start_time(void)
     873             : {
     874             :     LogicalRepWorker *worker;
     875             : 
     876             :     /*
     877             :      * The apply worker can't access last_seqsync_start_time concurrently, so
     878             :      * it is okay to use SHARED lock here. See ProcessSequencesForSync().
     879             :      */
     880          10 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     881             : 
     882          10 :     worker = logicalrep_worker_find(WORKERTYPE_APPLY,
     883          10 :                                     MyLogicalRepWorker->subid, InvalidOid,
     884             :                                     true);
     885          10 :     if (worker)
     886          10 :         worker->last_seqsync_start_time = 0;
     887             : 
     888          10 :     LWLockRelease(LogicalRepWorkerLock);
     889          10 : }
     890             : 
     891             : /*
     892             :  * Cleanup function.
     893             :  *
     894             :  * Called on logical replication worker exit.
     895             :  */
     896             : static void
     897        1068 : logicalrep_worker_onexit(int code, Datum arg)
     898             : {
     899             :     /* Disconnect gracefully from the remote side. */
     900        1068 :     if (LogRepWorkerWalRcvConn)
     901         846 :         walrcv_disconnect(LogRepWorkerWalRcvConn);
     902             : 
     903        1068 :     logicalrep_worker_detach();
     904             : 
     905             :     /* Cleanup fileset used for streaming transactions. */
     906        1068 :     if (MyLogicalRepWorker->stream_fileset != NULL)
     907          28 :         FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
     908             : 
     909             :     /*
     910             :      * Session level locks may be acquired outside of a transaction in
     911             :      * parallel apply mode and will not be released when the worker
     912             :      * terminates, so manually release all locks before the worker exits.
     913             :      *
     914             :      * The locks will be acquired once the worker is initialized.
     915             :      */
     916        1068 :     if (!InitializingApplyWorker)
     917         948 :         LockReleaseAll(DEFAULT_LOCKMETHOD, true);
     918             : 
     919        1068 :     ApplyLauncherWakeup();
     920        1068 : }
     921             : 
     922             : /*
     923             :  * Count the number of registered (not necessarily running) sync workers
     924             :  * for a subscription.
     925             :  */
     926             : int
     927        2436 : logicalrep_sync_worker_count(Oid subid)
     928             : {
     929             :     int         i;
     930        2436 :     int         res = 0;
     931             : 
     932             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     933             : 
     934             :     /* Search for attached worker for a given subscription id. */
     935       12576 :     for (i = 0; i < max_logical_replication_workers; i++)
     936             :     {
     937       10140 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     938             : 
     939       10140 :         if (w->subid == subid && (isTableSyncWorker(w) || isSequenceSyncWorker(w)))
     940        2586 :             res++;
     941             :     }
     942             : 
     943        2436 :     return res;
     944             : }
     945             : 
     946             : /*
     947             :  * Count the number of registered (but not necessarily running) parallel apply
     948             :  * workers for a subscription.
     949             :  */
     950             : static int
     951         838 : logicalrep_pa_worker_count(Oid subid)
     952             : {
     953             :     int         i;
     954         838 :     int         res = 0;
     955             : 
     956             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     957             : 
     958             :     /*
     959             :      * Scan all attached parallel apply workers, only counting those which
     960             :      * have the given subscription id.
     961             :      */
     962        4430 :     for (i = 0; i < max_logical_replication_workers; i++)
     963             :     {
     964        3592 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     965             : 
     966        3592 :         if (isParallelApplyWorker(w) && w->subid == subid)
     967           4 :             res++;
     968             :     }
     969             : 
     970         838 :     return res;
     971             : }
     972             : 
     973             : /*
     974             :  * ApplyLauncherShmemSize
     975             :  *      Compute space needed for replication launcher shared memory
     976             :  */
     977             : Size
     978        8500 : ApplyLauncherShmemSize(void)
     979             : {
     980             :     Size        size;
     981             : 
     982             :     /*
     983             :      * Need the fixed struct and the array of LogicalRepWorker.
     984             :      */
     985        8500 :     size = sizeof(LogicalRepCtxStruct);
     986        8500 :     size = MAXALIGN(size);
     987        8500 :     size = add_size(size, mul_size(max_logical_replication_workers,
     988             :                                    sizeof(LogicalRepWorker)));
     989        8500 :     return size;
     990             : }
     991             : 
     992             : /*
     993             :  * ApplyLauncherRegister
     994             :  *      Register a background worker running the logical replication launcher.
     995             :  */
     996             : void
     997        1766 : ApplyLauncherRegister(void)
     998             : {
     999             :     BackgroundWorker bgw;
    1000             : 
    1001             :     /*
    1002             :      * The logical replication launcher is disabled during binary upgrades, to
    1003             :      * prevent logical replication workers from running on the source cluster.
    1004             :      * That could cause replication origins to move forward after having been
    1005             :      * copied to the target cluster, potentially creating conflicts with the
    1006             :      * copied data files.
    1007             :      */
    1008        1766 :     if (max_logical_replication_workers == 0 || IsBinaryUpgrade)
    1009         106 :         return;
    1010             : 
    1011        1660 :     memset(&bgw, 0, sizeof(bgw));
    1012        1660 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
    1013             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
    1014        1660 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
    1015        1660 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
    1016        1660 :     snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
    1017        1660 :     snprintf(bgw.bgw_name, BGW_MAXLEN,
    1018             :              "logical replication launcher");
    1019        1660 :     snprintf(bgw.bgw_type, BGW_MAXLEN,
    1020             :              "logical replication launcher");
    1021        1660 :     bgw.bgw_restart_time = 5;
    1022        1660 :     bgw.bgw_notify_pid = 0;
    1023        1660 :     bgw.bgw_main_arg = (Datum) 0;
    1024             : 
    1025        1660 :     RegisterBackgroundWorker(&bgw);
    1026             : }
    1027             : 
    1028             : /*
    1029             :  * ApplyLauncherShmemInit
    1030             :  *      Allocate and initialize replication launcher shared memory
    1031             :  */
    1032             : void
    1033        2200 : ApplyLauncherShmemInit(void)
    1034             : {
    1035             :     bool        found;
    1036             : 
    1037        2200 :     LogicalRepCtx = (LogicalRepCtxStruct *)
    1038        2200 :         ShmemInitStruct("Logical Replication Launcher Data",
    1039             :                         ApplyLauncherShmemSize(),
    1040             :                         &found);
    1041             : 
    1042        2200 :     if (!found)
    1043             :     {
    1044             :         int         slot;
    1045             : 
    1046        2200 :         memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
    1047             : 
    1048        2200 :         LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
    1049        2200 :         LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
    1050             : 
    1051             :         /* Initialize memory and spin locks for each worker slot. */
    1052       10926 :         for (slot = 0; slot < max_logical_replication_workers; slot++)
    1053             :         {
    1054        8726 :             LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
    1055             : 
    1056        8726 :             memset(worker, 0, sizeof(LogicalRepWorker));
    1057        8726 :             SpinLockInit(&worker->relmutex);
    1058             :         }
    1059             :     }
    1060        2200 : }
    1061             : 
    1062             : /*
    1063             :  * Initialize or attach to the dynamic shared hash table that stores the
    1064             :  * last-start times, if not already done.
    1065             :  * This must be called before accessing the table.
    1066             :  */
    1067             : static void
    1068        1502 : logicalrep_launcher_attach_dshmem(void)
    1069             : {
    1070             :     MemoryContext oldcontext;
    1071             : 
    1072             :     /* Quick exit if we already did this. */
    1073        1502 :     if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
    1074        1392 :         last_start_times != NULL)
    1075        1018 :         return;
    1076             : 
    1077             :     /* Otherwise, use a lock to ensure only one process creates the table. */
    1078         484 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
    1079             : 
    1080             :     /* Be sure any local memory allocated by DSA routines is persistent. */
    1081         484 :     oldcontext = MemoryContextSwitchTo(TopMemoryContext);
    1082             : 
    1083         484 :     if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
    1084             :     {
    1085             :         /* Initialize dynamic shared hash table for last-start times. */
    1086         110 :         last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
    1087         110 :         dsa_pin(last_start_times_dsa);
    1088         110 :         dsa_pin_mapping(last_start_times_dsa);
    1089         110 :         last_start_times = dshash_create(last_start_times_dsa, &dsh_params, NULL);
    1090             : 
    1091             :         /* Store handles in shared memory for other backends to use. */
    1092         110 :         LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
    1093         110 :         LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
    1094             :     }
    1095         374 :     else if (!last_start_times)
    1096             :     {
    1097             :         /* Attach to existing dynamic shared hash table. */
    1098         374 :         last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
    1099         374 :         dsa_pin_mapping(last_start_times_dsa);
    1100         374 :         last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
    1101         374 :                                          LogicalRepCtx->last_start_dsh, NULL);
    1102             :     }
    1103             : 
    1104         484 :     MemoryContextSwitchTo(oldcontext);
    1105         484 :     LWLockRelease(LogicalRepWorkerLock);
    1106             : }
    1107             : 
    1108             : /*
    1109             :  * Set the last-start time for the subscription.
    1110             :  */
    1111             : static void
    1112         408 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
    1113             : {
    1114             :     LauncherLastStartTimesEntry *entry;
    1115             :     bool        found;
    1116             : 
    1117         408 :     logicalrep_launcher_attach_dshmem();
    1118             : 
    1119         408 :     entry = dshash_find_or_insert(last_start_times, &subid, &found);
    1120         408 :     entry->last_start_time = start_time;
    1121         408 :     dshash_release_lock(last_start_times, entry);
    1122         408 : }
    1123             : 
    1124             : /*
    1125             :  * Return the last-start time for the subscription, or 0 if there isn't one.
    1126             :  */
    1127             : static TimestampTz
    1128         642 : ApplyLauncherGetWorkerStartTime(Oid subid)
    1129             : {
    1130             :     LauncherLastStartTimesEntry *entry;
    1131             :     TimestampTz ret;
    1132             : 
    1133         642 :     logicalrep_launcher_attach_dshmem();
    1134             : 
    1135         642 :     entry = dshash_find(last_start_times, &subid, false);
    1136         642 :     if (entry == NULL)
    1137         284 :         return 0;
    1138             : 
    1139         358 :     ret = entry->last_start_time;
    1140         358 :     dshash_release_lock(last_start_times, entry);
    1141             : 
    1142         358 :     return ret;
    1143             : }
    1144             : 
    1145             : /*
    1146             :  * Remove the last-start-time entry for the subscription, if one exists.
    1147             :  *
    1148             :  * This has two use-cases: to remove the entry related to a subscription
    1149             :  * that's been deleted or disabled (just to avoid leaking shared memory),
    1150             :  * and to allow immediate restart of an apply worker that has exited
    1151             :  * due to subscription parameter changes.
    1152             :  */
    1153             : void
    1154         452 : ApplyLauncherForgetWorkerStartTime(Oid subid)
    1155             : {
    1156         452 :     logicalrep_launcher_attach_dshmem();
    1157             : 
    1158         452 :     (void) dshash_delete_key(last_start_times, &subid);
    1159         452 : }
    1160             : 
    1161             : /*
    1162             :  * Wakeup the launcher on commit if requested.
    1163             :  */
    1164             : void
    1165     1110776 : AtEOXact_ApplyLauncher(bool isCommit)
    1166             : {
    1167     1110776 :     if (isCommit)
    1168             :     {
    1169     1059618 :         if (on_commit_launcher_wakeup)
    1170         288 :             ApplyLauncherWakeup();
    1171             :     }
    1172             : 
    1173     1110776 :     on_commit_launcher_wakeup = false;
    1174     1110776 : }
    1175             : 
    1176             : /*
    1177             :  * Request wakeup of the launcher on commit of the transaction.
    1178             :  *
    1179             :  * This is used to send launcher signal to stop sleeping and process the
    1180             :  * subscriptions when current transaction commits. Should be used when new
    1181             :  * tuple was added to the pg_subscription catalog.
    1182             : */
    1183             : void
    1184         290 : ApplyLauncherWakeupAtCommit(void)
    1185             : {
    1186         290 :     if (!on_commit_launcher_wakeup)
    1187         288 :         on_commit_launcher_wakeup = true;
    1188         290 : }
    1189             : 
    1190             : /*
    1191             :  * Wakeup the launcher immediately.
    1192             :  */
    1193             : void
    1194        1428 : ApplyLauncherWakeup(void)
    1195             : {
    1196        1428 :     if (LogicalRepCtx->launcher_pid != 0)
    1197        1394 :         kill(LogicalRepCtx->launcher_pid, SIGUSR1);
    1198        1428 : }
    1199             : 
    1200             : /*
    1201             :  * Main loop for the apply launcher process.
    1202             :  */
    1203             : void
    1204         860 : ApplyLauncherMain(Datum main_arg)
    1205             : {
    1206         860 :     ereport(DEBUG1,
    1207             :             (errmsg_internal("logical replication launcher started")));
    1208             : 
    1209         860 :     before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
    1210             : 
    1211             :     Assert(LogicalRepCtx->launcher_pid == 0);
    1212         860 :     LogicalRepCtx->launcher_pid = MyProcPid;
    1213             : 
    1214             :     /* Establish signal handlers. */
    1215         860 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
    1216         860 :     pqsignal(SIGTERM, die);
    1217         860 :     BackgroundWorkerUnblockSignals();
    1218             : 
    1219             :     /*
    1220             :      * Establish connection to nailed catalogs (we only ever access
    1221             :      * pg_subscription).
    1222             :      */
    1223         860 :     BackgroundWorkerInitializeConnection(NULL, NULL, 0);
    1224             : 
    1225             :     /*
    1226             :      * Acquire the conflict detection slot at startup to ensure it can be
    1227             :      * dropped if no longer needed after a restart.
    1228             :      */
    1229         860 :     acquire_conflict_slot_if_exists();
    1230             : 
    1231             :     /* Enter main loop */
    1232             :     for (;;)
    1233        4906 :     {
    1234             :         int         rc;
    1235             :         List       *sublist;
    1236             :         ListCell   *lc;
    1237             :         MemoryContext subctx;
    1238             :         MemoryContext oldctx;
    1239        5766 :         long        wait_time = DEFAULT_NAPTIME_PER_CYCLE;
    1240        5766 :         bool        can_update_xmin = true;
    1241        5766 :         bool        retain_dead_tuples = false;
    1242        5766 :         TransactionId xmin = InvalidTransactionId;
    1243             : 
    1244        5766 :         CHECK_FOR_INTERRUPTS();
    1245             : 
    1246             :         /* Use temporary context to avoid leaking memory across cycles. */
    1247        5764 :         subctx = AllocSetContextCreate(TopMemoryContext,
    1248             :                                        "Logical Replication Launcher sublist",
    1249             :                                        ALLOCSET_DEFAULT_SIZES);
    1250        5764 :         oldctx = MemoryContextSwitchTo(subctx);
    1251             : 
    1252             :         /*
    1253             :          * Start any missing workers for enabled subscriptions.
    1254             :          *
    1255             :          * Also, during the iteration through all subscriptions, we compute
    1256             :          * the minimum XID required to protect deleted tuples for conflict
    1257             :          * detection if one of the subscription enables retain_dead_tuples
    1258             :          * option.
    1259             :          */
    1260        5764 :         sublist = get_subscription_list();
    1261        7512 :         foreach(lc, sublist)
    1262             :         {
    1263        1748 :             Subscription *sub = (Subscription *) lfirst(lc);
    1264             :             LogicalRepWorker *w;
    1265             :             TimestampTz last_start;
    1266             :             TimestampTz now;
    1267             :             long        elapsed;
    1268             : 
    1269        1748 :             if (sub->retaindeadtuples)
    1270             :             {
    1271          90 :                 retain_dead_tuples = true;
    1272             : 
    1273             :                 /*
    1274             :                  * Create a replication slot to retain information necessary
    1275             :                  * for conflict detection such as dead tuples, commit
    1276             :                  * timestamps, and origins.
    1277             :                  *
    1278             :                  * The slot is created before starting the apply worker to
    1279             :                  * prevent it from unnecessarily maintaining its
    1280             :                  * oldest_nonremovable_xid.
    1281             :                  *
    1282             :                  * The slot is created even for a disabled subscription to
    1283             :                  * ensure that conflict-related information is available when
    1284             :                  * applying remote changes that occurred before the
    1285             :                  * subscription was enabled.
    1286             :                  */
    1287          90 :                 CreateConflictDetectionSlot();
    1288             : 
    1289          90 :                 if (sub->retentionactive)
    1290             :                 {
    1291             :                     /*
    1292             :                      * Can't advance xmin of the slot unless all the
    1293             :                      * subscriptions actively retaining dead tuples are
    1294             :                      * enabled. This is required to ensure that we don't
    1295             :                      * advance the xmin of CONFLICT_DETECTION_SLOT if one of
    1296             :                      * the subscriptions is not enabled. Otherwise, we won't
    1297             :                      * be able to detect conflicts reliably for such a
    1298             :                      * subscription even though it has set the
    1299             :                      * retain_dead_tuples option.
    1300             :                      */
    1301          90 :                     can_update_xmin &= sub->enabled;
    1302             : 
    1303             :                     /*
    1304             :                      * Initialize the slot once the subscription activiates
    1305             :                      * retention.
    1306             :                      */
    1307          90 :                     if (!TransactionIdIsValid(MyReplicationSlot->data.xmin))
    1308           0 :                         init_conflict_slot_xmin();
    1309             :                 }
    1310             :             }
    1311             : 
    1312        1748 :             if (!sub->enabled)
    1313          82 :                 continue;
    1314             : 
    1315        1666 :             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1316        1666 :             w = logicalrep_worker_find(WORKERTYPE_APPLY, sub->oid, InvalidOid,
    1317             :                                        false);
    1318             : 
    1319        1666 :             if (w != NULL)
    1320             :             {
    1321             :                 /*
    1322             :                  * Compute the minimum xmin required to protect dead tuples
    1323             :                  * required for conflict detection among all running apply
    1324             :                  * workers. This computation is performed while holding
    1325             :                  * LogicalRepWorkerLock to prevent accessing invalid worker
    1326             :                  * data, in scenarios where a worker might exit and reset its
    1327             :                  * state concurrently.
    1328             :                  */
    1329        1024 :                 if (sub->retaindeadtuples &&
    1330          80 :                     sub->retentionactive &&
    1331             :                     can_update_xmin)
    1332          80 :                     compute_min_nonremovable_xid(w, &xmin);
    1333             : 
    1334        1024 :                 LWLockRelease(LogicalRepWorkerLock);
    1335             : 
    1336             :                 /* worker is running already */
    1337        1024 :                 continue;
    1338             :             }
    1339             : 
    1340         642 :             LWLockRelease(LogicalRepWorkerLock);
    1341             : 
    1342             :             /*
    1343             :              * Can't advance xmin of the slot unless all the workers
    1344             :              * corresponding to subscriptions actively retaining dead tuples
    1345             :              * are running, disabling the further computation of the minimum
    1346             :              * nonremovable xid.
    1347             :              */
    1348         642 :             if (sub->retaindeadtuples && sub->retentionactive)
    1349           4 :                 can_update_xmin = false;
    1350             : 
    1351             :             /*
    1352             :              * If the worker is eligible to start now, launch it.  Otherwise,
    1353             :              * adjust wait_time so that we'll wake up as soon as it can be
    1354             :              * started.
    1355             :              *
    1356             :              * Each subscription's apply worker can only be restarted once per
    1357             :              * wal_retrieve_retry_interval, so that errors do not cause us to
    1358             :              * repeatedly restart the worker as fast as possible.  In cases
    1359             :              * where a restart is expected (e.g., subscription parameter
    1360             :              * changes), another process should remove the last-start entry
    1361             :              * for the subscription so that the worker can be restarted
    1362             :              * without waiting for wal_retrieve_retry_interval to elapse.
    1363             :              */
    1364         642 :             last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
    1365         642 :             now = GetCurrentTimestamp();
    1366         642 :             if (last_start == 0 ||
    1367         358 :                 (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
    1368             :             {
    1369         408 :                 ApplyLauncherSetWorkerStartTime(sub->oid, now);
    1370         408 :                 if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
    1371         408 :                                               sub->dbid, sub->oid, sub->name,
    1372             :                                               sub->owner, InvalidOid,
    1373             :                                               DSM_HANDLE_INVALID,
    1374         412 :                                               sub->retaindeadtuples &&
    1375           4 :                                               sub->retentionactive))
    1376             :                 {
    1377             :                     /*
    1378             :                      * We get here either if we failed to launch a worker
    1379             :                      * (perhaps for resource-exhaustion reasons) or if we
    1380             :                      * launched one but it immediately quit.  Either way, it
    1381             :                      * seems appropriate to try again after
    1382             :                      * wal_retrieve_retry_interval.
    1383             :                      */
    1384          24 :                     wait_time = Min(wait_time,
    1385             :                                     wal_retrieve_retry_interval);
    1386             :                 }
    1387             :             }
    1388             :             else
    1389             :             {
    1390         234 :                 wait_time = Min(wait_time,
    1391             :                                 wal_retrieve_retry_interval - elapsed);
    1392             :             }
    1393             :         }
    1394             : 
    1395             :         /*
    1396             :          * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription
    1397             :          * that requires us to retain dead tuples. Otherwise, if required,
    1398             :          * advance the slot's xmin to protect dead tuples required for the
    1399             :          * conflict detection.
    1400             :          *
    1401             :          * Additionally, if all apply workers for subscriptions with
    1402             :          * retain_dead_tuples enabled have requested to stop retention, the
    1403             :          * slot's xmin will be set to InvalidTransactionId allowing the
    1404             :          * removal of dead tuples.
    1405             :          */
    1406        5764 :         if (MyReplicationSlot)
    1407             :         {
    1408          92 :             if (!retain_dead_tuples)
    1409           2 :                 ReplicationSlotDropAcquired();
    1410          90 :             else if (can_update_xmin)
    1411          80 :                 update_conflict_slot_xmin(xmin);
    1412             :         }
    1413             : 
    1414             :         /* Switch back to original memory context. */
    1415        5764 :         MemoryContextSwitchTo(oldctx);
    1416             :         /* Clean the temporary memory. */
    1417        5764 :         MemoryContextDelete(subctx);
    1418             : 
    1419             :         /* Wait for more work. */
    1420        5764 :         rc = WaitLatch(MyLatch,
    1421             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1422             :                        wait_time,
    1423             :                        WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
    1424             : 
    1425        5758 :         if (rc & WL_LATCH_SET)
    1426             :         {
    1427        5714 :             ResetLatch(MyLatch);
    1428        5714 :             CHECK_FOR_INTERRUPTS();
    1429             :         }
    1430             : 
    1431        4906 :         if (ConfigReloadPending)
    1432             :         {
    1433          72 :             ConfigReloadPending = false;
    1434          72 :             ProcessConfigFile(PGC_SIGHUP);
    1435             :         }
    1436             :     }
    1437             : 
    1438             :     /* Not reachable */
    1439             : }
    1440             : 
    1441             : /*
    1442             :  * Determine the minimum non-removable transaction ID across all apply workers
    1443             :  * for subscriptions that have retain_dead_tuples enabled. Store the result
    1444             :  * in *xmin.
    1445             :  */
    1446             : static void
    1447          80 : compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
    1448             : {
    1449             :     TransactionId nonremovable_xid;
    1450             : 
    1451             :     Assert(worker != NULL);
    1452             : 
    1453             :     /*
    1454             :      * The replication slot for conflict detection must be created before the
    1455             :      * worker starts.
    1456             :      */
    1457             :     Assert(MyReplicationSlot);
    1458             : 
    1459          80 :     SpinLockAcquire(&worker->relmutex);
    1460          80 :     nonremovable_xid = worker->oldest_nonremovable_xid;
    1461          80 :     SpinLockRelease(&worker->relmutex);
    1462             : 
    1463             :     /*
    1464             :      * Return if the apply worker has stopped retention concurrently.
    1465             :      *
    1466             :      * Although this function is invoked only when retentionactive is true,
    1467             :      * the apply worker might stop retention after the launcher fetches the
    1468             :      * retentionactive flag.
    1469             :      */
    1470          80 :     if (!TransactionIdIsValid(nonremovable_xid))
    1471           0 :         return;
    1472             : 
    1473          80 :     if (!TransactionIdIsValid(*xmin) ||
    1474           0 :         TransactionIdPrecedes(nonremovable_xid, *xmin))
    1475          80 :         *xmin = nonremovable_xid;
    1476             : }
    1477             : 
    1478             : /*
    1479             :  * Acquire the replication slot used to retain information for conflict
    1480             :  * detection, if it exists.
    1481             :  *
    1482             :  * Return true if successfully acquired, otherwise return false.
    1483             :  */
    1484             : static bool
    1485         860 : acquire_conflict_slot_if_exists(void)
    1486             : {
    1487         860 :     if (!SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true))
    1488         858 :         return false;
    1489             : 
    1490           2 :     ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false);
    1491           2 :     return true;
    1492             : }
    1493             : 
    1494             : /*
    1495             :  * Update the xmin the replication slot used to retain information required
    1496             :  * for conflict detection.
    1497             :  */
    1498             : static void
    1499          80 : update_conflict_slot_xmin(TransactionId new_xmin)
    1500             : {
    1501             :     Assert(MyReplicationSlot);
    1502             :     Assert(!TransactionIdIsValid(new_xmin) ||
    1503             :            TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
    1504             : 
    1505             :     /* Return if the xmin value of the slot cannot be updated */
    1506          80 :     if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin))
    1507          56 :         return;
    1508             : 
    1509          24 :     SpinLockAcquire(&MyReplicationSlot->mutex);
    1510          24 :     MyReplicationSlot->effective_xmin = new_xmin;
    1511          24 :     MyReplicationSlot->data.xmin = new_xmin;
    1512          24 :     SpinLockRelease(&MyReplicationSlot->mutex);
    1513             : 
    1514          24 :     elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin);
    1515             : 
    1516          24 :     ReplicationSlotMarkDirty();
    1517          24 :     ReplicationSlotsComputeRequiredXmin(false);
    1518             : 
    1519             :     /*
    1520             :      * Like PhysicalConfirmReceivedLocation(), do not save slot information
    1521             :      * each time. This is acceptable because all concurrent transactions on
    1522             :      * the publisher that require the data preceding the slot's xmin should
    1523             :      * have already been applied and flushed on the subscriber before the xmin
    1524             :      * is advanced. So, even if the slot's xmin regresses after a restart, it
    1525             :      * will be advanced again in the next cycle. Therefore, no data required
    1526             :      * for conflict detection will be prematurely removed.
    1527             :      */
    1528          24 :     return;
    1529             : }
    1530             : 
    1531             : /*
    1532             :  * Initialize the xmin for the conflict detection slot.
    1533             :  */
    1534             : static void
    1535           8 : init_conflict_slot_xmin(void)
    1536             : {
    1537             :     TransactionId xmin_horizon;
    1538             : 
    1539             :     /* Replication slot must exist but shouldn't be initialized. */
    1540             :     Assert(MyReplicationSlot &&
    1541             :            !TransactionIdIsValid(MyReplicationSlot->data.xmin));
    1542             : 
    1543           8 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
    1544             : 
    1545           8 :     xmin_horizon = GetOldestSafeDecodingTransactionId(false);
    1546             : 
    1547           8 :     SpinLockAcquire(&MyReplicationSlot->mutex);
    1548           8 :     MyReplicationSlot->effective_xmin = xmin_horizon;
    1549           8 :     MyReplicationSlot->data.xmin = xmin_horizon;
    1550           8 :     SpinLockRelease(&MyReplicationSlot->mutex);
    1551             : 
    1552           8 :     ReplicationSlotsComputeRequiredXmin(true);
    1553             : 
    1554           8 :     LWLockRelease(ProcArrayLock);
    1555             : 
    1556             :     /* Write this slot to disk */
    1557           8 :     ReplicationSlotMarkDirty();
    1558           8 :     ReplicationSlotSave();
    1559           8 : }
    1560             : 
    1561             : /*
    1562             :  * Create and acquire the replication slot used to retain information for
    1563             :  * conflict detection, if not yet.
    1564             :  */
    1565             : void
    1566          92 : CreateConflictDetectionSlot(void)
    1567             : {
    1568             :     /* Exit early, if the replication slot is already created and acquired */
    1569          92 :     if (MyReplicationSlot)
    1570          84 :         return;
    1571             : 
    1572           8 :     ereport(LOG,
    1573             :             errmsg("creating replication conflict detection slot"));
    1574             : 
    1575           8 :     ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
    1576             :                           false, false);
    1577             : 
    1578           8 :     init_conflict_slot_xmin();
    1579             : }
    1580             : 
    1581             : /*
    1582             :  * Is current process the logical replication launcher?
    1583             :  */
    1584             : bool
    1585        4920 : IsLogicalLauncher(void)
    1586             : {
    1587        4920 :     return LogicalRepCtx->launcher_pid == MyProcPid;
    1588             : }
    1589             : 
    1590             : /*
    1591             :  * Return the pid of the leader apply worker if the given pid is the pid of a
    1592             :  * parallel apply worker, otherwise, return InvalidPid.
    1593             :  */
    1594             : pid_t
    1595        2104 : GetLeaderApplyWorkerPid(pid_t pid)
    1596             : {
    1597        2104 :     int         leader_pid = InvalidPid;
    1598             :     int         i;
    1599             : 
    1600        2104 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1601             : 
    1602       10520 :     for (i = 0; i < max_logical_replication_workers; i++)
    1603             :     {
    1604        8416 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
    1605             : 
    1606        8416 :         if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
    1607             :         {
    1608           0 :             leader_pid = w->leader_pid;
    1609           0 :             break;
    1610             :         }
    1611             :     }
    1612             : 
    1613        2104 :     LWLockRelease(LogicalRepWorkerLock);
    1614             : 
    1615        2104 :     return leader_pid;
    1616             : }
    1617             : 
    1618             : /*
    1619             :  * Returns state of the subscriptions.
    1620             :  */
    1621             : Datum
    1622           2 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
    1623             : {
    1624             : #define PG_STAT_GET_SUBSCRIPTION_COLS   10
    1625           2 :     Oid         subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
    1626             :     int         i;
    1627           2 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1628             : 
    1629           2 :     InitMaterializedSRF(fcinfo, 0);
    1630             : 
    1631             :     /* Make sure we get consistent view of the workers. */
    1632           2 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1633             : 
    1634          10 :     for (i = 0; i < max_logical_replication_workers; i++)
    1635             :     {
    1636             :         /* for each row */
    1637           8 :         Datum       values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
    1638           8 :         bool        nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
    1639             :         int         worker_pid;
    1640             :         LogicalRepWorker worker;
    1641             : 
    1642           8 :         memcpy(&worker, &LogicalRepCtx->workers[i],
    1643             :                sizeof(LogicalRepWorker));
    1644           8 :         if (!worker.proc || !IsBackendPid(worker.proc->pid))
    1645           4 :             continue;
    1646             : 
    1647           4 :         if (OidIsValid(subid) && worker.subid != subid)
    1648           0 :             continue;
    1649             : 
    1650           4 :         worker_pid = worker.proc->pid;
    1651             : 
    1652           4 :         values[0] = ObjectIdGetDatum(worker.subid);
    1653           4 :         if (isTableSyncWorker(&worker))
    1654           0 :             values[1] = ObjectIdGetDatum(worker.relid);
    1655             :         else
    1656           4 :             nulls[1] = true;
    1657           4 :         values[2] = Int32GetDatum(worker_pid);
    1658             : 
    1659           4 :         if (isParallelApplyWorker(&worker))
    1660           0 :             values[3] = Int32GetDatum(worker.leader_pid);
    1661             :         else
    1662           4 :             nulls[3] = true;
    1663             : 
    1664           4 :         if (!XLogRecPtrIsValid(worker.last_lsn))
    1665           2 :             nulls[4] = true;
    1666             :         else
    1667           2 :             values[4] = LSNGetDatum(worker.last_lsn);
    1668           4 :         if (worker.last_send_time == 0)
    1669           0 :             nulls[5] = true;
    1670             :         else
    1671           4 :             values[5] = TimestampTzGetDatum(worker.last_send_time);
    1672           4 :         if (worker.last_recv_time == 0)
    1673           0 :             nulls[6] = true;
    1674             :         else
    1675           4 :             values[6] = TimestampTzGetDatum(worker.last_recv_time);
    1676           4 :         if (!XLogRecPtrIsValid(worker.reply_lsn))
    1677           2 :             nulls[7] = true;
    1678             :         else
    1679           2 :             values[7] = LSNGetDatum(worker.reply_lsn);
    1680           4 :         if (worker.reply_time == 0)
    1681           0 :             nulls[8] = true;
    1682             :         else
    1683           4 :             values[8] = TimestampTzGetDatum(worker.reply_time);
    1684             : 
    1685           4 :         switch (worker.type)
    1686             :         {
    1687           4 :             case WORKERTYPE_APPLY:
    1688           4 :                 values[9] = CStringGetTextDatum("apply");
    1689           4 :                 break;
    1690           0 :             case WORKERTYPE_PARALLEL_APPLY:
    1691           0 :                 values[9] = CStringGetTextDatum("parallel apply");
    1692           0 :                 break;
    1693           0 :             case WORKERTYPE_SEQUENCESYNC:
    1694           0 :                 values[9] = CStringGetTextDatum("sequence synchronization");
    1695           0 :                 break;
    1696           0 :             case WORKERTYPE_TABLESYNC:
    1697           0 :                 values[9] = CStringGetTextDatum("table synchronization");
    1698           0 :                 break;
    1699           0 :             case WORKERTYPE_UNKNOWN:
    1700             :                 /* Should never happen. */
    1701           0 :                 elog(ERROR, "unknown worker type");
    1702             :         }
    1703             : 
    1704           4 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
    1705             :                              values, nulls);
    1706             : 
    1707             :         /*
    1708             :          * If only a single subscription was requested, and we found it,
    1709             :          * break.
    1710             :          */
    1711           4 :         if (OidIsValid(subid))
    1712           0 :             break;
    1713             :     }
    1714             : 
    1715           2 :     LWLockRelease(LogicalRepWorkerLock);
    1716             : 
    1717           2 :     return (Datum) 0;
    1718             : }

Generated by: LCOV version 1.16