LCOV - code coverage report
Current view: top level - src/backend/replication/logical - launcher.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 89.2 % 566 505
Test Date: 2026-02-17 17:20:33 Functions: 100.0 % 37 37
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  * launcher.c
       3              :  *     PostgreSQL logical replication worker launcher process
       4              :  *
       5              :  * Copyright (c) 2016-2026, PostgreSQL Global Development Group
       6              :  *
       7              :  * IDENTIFICATION
       8              :  *    src/backend/replication/logical/launcher.c
       9              :  *
      10              :  * NOTES
      11              :  *    This module contains the logical replication worker launcher which
      12              :  *    uses the background worker infrastructure to start the logical
      13              :  *    replication workers for every enabled subscription.
      14              :  *
      15              :  *-------------------------------------------------------------------------
      16              :  */
      17              : 
      18              : #include "postgres.h"
      19              : 
      20              : #include "access/heapam.h"
      21              : #include "access/htup.h"
      22              : #include "access/htup_details.h"
      23              : #include "access/tableam.h"
      24              : #include "access/xact.h"
      25              : #include "catalog/pg_subscription.h"
      26              : #include "catalog/pg_subscription_rel.h"
      27              : #include "funcapi.h"
      28              : #include "lib/dshash.h"
      29              : #include "miscadmin.h"
      30              : #include "pgstat.h"
      31              : #include "postmaster/bgworker.h"
      32              : #include "postmaster/interrupt.h"
      33              : #include "replication/logicallauncher.h"
      34              : #include "replication/origin.h"
      35              : #include "replication/slot.h"
      36              : #include "replication/walreceiver.h"
      37              : #include "replication/worker_internal.h"
      38              : #include "storage/ipc.h"
      39              : #include "storage/proc.h"
      40              : #include "storage/procarray.h"
      41              : #include "tcop/tcopprot.h"
      42              : #include "utils/builtins.h"
      43              : #include "utils/memutils.h"
      44              : #include "utils/pg_lsn.h"
      45              : #include "utils/snapmgr.h"
      46              : #include "utils/syscache.h"
      47              : 
      48              : /* max sleep time between cycles (3min) */
      49              : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
      50              : 
      51              : /* GUC variables */
      52              : int         max_logical_replication_workers = 4;
      53              : int         max_sync_workers_per_subscription = 2;
      54              : int         max_parallel_apply_workers_per_subscription = 2;
      55              : 
      56              : LogicalRepWorker *MyLogicalRepWorker = NULL;
      57              : 
      58              : typedef struct LogicalRepCtxStruct
      59              : {
      60              :     /* Supervisor process. */
      61              :     pid_t       launcher_pid;
      62              : 
      63              :     /* Hash table holding last start times of subscriptions' apply workers. */
      64              :     dsa_handle  last_start_dsa;
      65              :     dshash_table_handle last_start_dsh;
      66              : 
      67              :     /* Background workers. */
      68              :     LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
      69              : } LogicalRepCtxStruct;
      70              : 
      71              : static LogicalRepCtxStruct *LogicalRepCtx;
      72              : 
      73              : /* an entry in the last-start-times shared hash table */
      74              : typedef struct LauncherLastStartTimesEntry
      75              : {
      76              :     Oid         subid;          /* OID of logrep subscription (hash key) */
      77              :     TimestampTz last_start_time;    /* last time its apply worker was started */
      78              : } LauncherLastStartTimesEntry;
      79              : 
      80              : /* parameters for the last-start-times shared hash table */
      81              : static const dshash_parameters dsh_params = {
      82              :     sizeof(Oid),
      83              :     sizeof(LauncherLastStartTimesEntry),
      84              :     dshash_memcmp,
      85              :     dshash_memhash,
      86              :     dshash_memcpy,
      87              :     LWTRANCHE_LAUNCHER_HASH
      88              : };
      89              : 
      90              : static dsa_area *last_start_times_dsa = NULL;
      91              : static dshash_table *last_start_times = NULL;
      92              : 
      93              : static bool on_commit_launcher_wakeup = false;
      94              : 
      95              : 
      96              : static void logicalrep_launcher_onexit(int code, Datum arg);
      97              : static void logicalrep_worker_onexit(int code, Datum arg);
      98              : static void logicalrep_worker_detach(void);
      99              : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
     100              : static int  logicalrep_pa_worker_count(Oid subid);
     101              : static void logicalrep_launcher_attach_dshmem(void);
     102              : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
     103              : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
     104              : static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
     105              : static bool acquire_conflict_slot_if_exists(void);
     106              : static void update_conflict_slot_xmin(TransactionId new_xmin);
     107              : static void init_conflict_slot_xmin(void);
     108              : 
     109              : 
     110              : /*
     111              :  * Load the list of subscriptions.
     112              :  *
     113              :  * Only the fields interesting for worker start/stop functions are filled for
     114              :  * each subscription.
     115              :  */
     116              : static List *
     117         2962 : get_subscription_list(void)
     118              : {
     119         2962 :     List       *res = NIL;
     120              :     Relation    rel;
     121              :     TableScanDesc scan;
     122              :     HeapTuple   tup;
     123              :     MemoryContext resultcxt;
     124              : 
     125              :     /* This is the context that we will allocate our output data in */
     126         2962 :     resultcxt = CurrentMemoryContext;
     127              : 
     128              :     /*
     129              :      * Start a transaction so we can access pg_subscription.
     130              :      */
     131         2962 :     StartTransactionCommand();
     132              : 
     133         2962 :     rel = table_open(SubscriptionRelationId, AccessShareLock);
     134         2962 :     scan = table_beginscan_catalog(rel, 0, NULL);
     135              : 
     136         3885 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     137              :     {
     138          923 :         Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
     139              :         Subscription *sub;
     140              :         MemoryContext oldcxt;
     141              : 
     142              :         /*
     143              :          * Allocate our results in the caller's context, not the
     144              :          * transaction's. We do this inside the loop, and restore the original
     145              :          * context at the end, so that leaky things like heap_getnext() are
     146              :          * not called in a potentially long-lived context.
     147              :          */
     148          923 :         oldcxt = MemoryContextSwitchTo(resultcxt);
     149              : 
     150          923 :         sub = palloc0_object(Subscription);
     151          923 :         sub->oid = subform->oid;
     152          923 :         sub->dbid = subform->subdbid;
     153          923 :         sub->owner = subform->subowner;
     154          923 :         sub->enabled = subform->subenabled;
     155          923 :         sub->name = pstrdup(NameStr(subform->subname));
     156          923 :         sub->retaindeadtuples = subform->subretaindeadtuples;
     157          923 :         sub->retentionactive = subform->subretentionactive;
     158              :         /* We don't fill fields we are not interested in. */
     159              : 
     160          923 :         res = lappend(res, sub);
     161          923 :         MemoryContextSwitchTo(oldcxt);
     162              :     }
     163              : 
     164         2962 :     table_endscan(scan);
     165         2962 :     table_close(rel, AccessShareLock);
     166              : 
     167         2962 :     CommitTransactionCommand();
     168              : 
     169         2962 :     return res;
     170              : }
     171              : 
     172              : /*
     173              :  * Wait for a background worker to start up and attach to the shmem context.
     174              :  *
     175              :  * This is only needed for cleaning up the shared memory in case the worker
     176              :  * fails to attach.
     177              :  *
     178              :  * Returns whether the attach was successful.
     179              :  */
     180              : static bool
     181          430 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
     182              :                                uint16 generation,
     183              :                                BackgroundWorkerHandle *handle)
     184              : {
     185          430 :     bool        result = false;
     186          430 :     bool        dropped_latch = false;
     187              : 
     188              :     for (;;)
     189         1308 :     {
     190              :         BgwHandleStatus status;
     191              :         pid_t       pid;
     192              :         int         rc;
     193              : 
     194         1738 :         CHECK_FOR_INTERRUPTS();
     195              : 
     196         1738 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     197              : 
     198              :         /* Worker either died or has started. Return false if died. */
     199         1738 :         if (!worker->in_use || worker->proc)
     200              :         {
     201          430 :             result = worker->in_use;
     202          430 :             LWLockRelease(LogicalRepWorkerLock);
     203          430 :             break;
     204              :         }
     205              : 
     206         1308 :         LWLockRelease(LogicalRepWorkerLock);
     207              : 
     208              :         /* Check if worker has died before attaching, and clean up after it. */
     209         1308 :         status = GetBackgroundWorkerPid(handle, &pid);
     210              : 
     211         1308 :         if (status == BGWH_STOPPED)
     212              :         {
     213            0 :             LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     214              :             /* Ensure that this was indeed the worker we waited for. */
     215            0 :             if (generation == worker->generation)
     216            0 :                 logicalrep_worker_cleanup(worker);
     217            0 :             LWLockRelease(LogicalRepWorkerLock);
     218            0 :             break;              /* result is already false */
     219              :         }
     220              : 
     221              :         /*
     222              :          * We need timeout because we generally don't get notified via latch
     223              :          * about the worker attach.  But we don't expect to have to wait long.
     224              :          */
     225         1308 :         rc = WaitLatch(MyLatch,
     226              :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     227              :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     228              : 
     229         1308 :         if (rc & WL_LATCH_SET)
     230              :         {
     231          498 :             ResetLatch(MyLatch);
     232          498 :             CHECK_FOR_INTERRUPTS();
     233          498 :             dropped_latch = true;
     234              :         }
     235              :     }
     236              : 
     237              :     /*
     238              :      * If we had to clear a latch event in order to wait, be sure to restore
     239              :      * it before exiting.  Otherwise caller may miss events.
     240              :      */
     241          430 :     if (dropped_latch)
     242          429 :         SetLatch(MyLatch);
     243              : 
     244          430 :     return result;
     245              : }
     246              : 
     247              : /*
     248              :  * Walks the workers array and searches for one that matches given worker type,
     249              :  * subscription id, and relation id.
     250              :  *
     251              :  * For both apply workers and sequencesync workers, the relid should be set to
     252              :  * InvalidOid, as these workers handle changes across all tables and sequences
     253              :  * respectively, rather than targeting a specific relation. For tablesync
     254              :  * workers, the relid should be set to the OID of the relation being
     255              :  * synchronized.
     256              :  */
     257              : LogicalRepWorker *
     258         3216 : logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid,
     259              :                        bool only_running)
     260              : {
     261              :     int         i;
     262         3216 :     LogicalRepWorker *res = NULL;
     263              : 
     264              :     /* relid must be valid only for table sync workers */
     265              :     Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
     266              :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     267              : 
     268              :     /* Search for an attached worker that matches the specified criteria. */
     269         9766 :     for (i = 0; i < max_logical_replication_workers; i++)
     270              :     {
     271         8540 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     272              : 
     273              :         /* Skip parallel apply workers. */
     274         8540 :         if (isParallelApplyWorker(w))
     275            0 :             continue;
     276              : 
     277         8540 :         if (w->in_use && w->subid == subid && w->relid == relid &&
     278         2014 :             w->type == wtype && (!only_running || w->proc))
     279              :         {
     280         1990 :             res = w;
     281         1990 :             break;
     282              :         }
     283              :     }
     284              : 
     285         3216 :     return res;
     286              : }
     287              : 
     288              : /*
     289              :  * Similar to logicalrep_worker_find(), but returns a list of all workers for
     290              :  * the subscription, instead of just one.
     291              :  */
     292              : List *
     293          669 : logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
     294              : {
     295              :     int         i;
     296          669 :     List       *res = NIL;
     297              : 
     298          669 :     if (acquire_lock)
     299          120 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     300              : 
     301              :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     302              : 
     303              :     /* Search for attached worker for a given subscription id. */
     304         3469 :     for (i = 0; i < max_logical_replication_workers; i++)
     305              :     {
     306         2800 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     307              : 
     308         2800 :         if (w->in_use && w->subid == subid && (!only_running || w->proc))
     309          483 :             res = lappend(res, w);
     310              :     }
     311              : 
     312          669 :     if (acquire_lock)
     313          120 :         LWLockRelease(LogicalRepWorkerLock);
     314              : 
     315          669 :     return res;
     316              : }
     317              : 
     318              : /*
     319              :  * Start new logical replication background worker, if possible.
     320              :  *
     321              :  * Returns true on success, false on failure.
     322              :  */
     323              : bool
     324          430 : logicalrep_worker_launch(LogicalRepWorkerType wtype,
     325              :                          Oid dbid, Oid subid, const char *subname, Oid userid,
     326              :                          Oid relid, dsm_handle subworker_dsm,
     327              :                          bool retain_dead_tuples)
     328              : {
     329              :     BackgroundWorker bgw;
     330              :     BackgroundWorkerHandle *bgw_handle;
     331              :     uint16      generation;
     332              :     int         i;
     333          430 :     int         slot = 0;
     334          430 :     LogicalRepWorker *worker = NULL;
     335              :     int         nsyncworkers;
     336              :     int         nparallelapplyworkers;
     337              :     TimestampTz now;
     338          430 :     bool        is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
     339          430 :     bool        is_sequencesync_worker = (wtype == WORKERTYPE_SEQUENCESYNC);
     340          430 :     bool        is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
     341              : 
     342              :     /*----------
     343              :      * Sanity checks:
     344              :      * - must be valid worker type
     345              :      * - tablesync workers are only ones to have relid
     346              :      * - parallel apply worker is the only kind of subworker
     347              :      * - The replication slot used in conflict detection is created when
     348              :      *   retain_dead_tuples is enabled
     349              :      */
     350              :     Assert(wtype != WORKERTYPE_UNKNOWN);
     351              :     Assert(is_tablesync_worker == OidIsValid(relid));
     352              :     Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
     353              :     Assert(!retain_dead_tuples || MyReplicationSlot);
     354              : 
     355          430 :     ereport(DEBUG1,
     356              :             (errmsg_internal("starting logical replication worker for subscription \"%s\"",
     357              :                              subname)));
     358              : 
     359              :     /* Report this after the initial starting message for consistency. */
     360          430 :     if (max_active_replication_origins == 0)
     361            0 :         ereport(ERROR,
     362              :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     363              :                  errmsg("cannot start logical replication workers when \"max_active_replication_origins\" is 0")));
     364              : 
     365              :     /*
     366              :      * We need to do the modification of the shared memory under lock so that
     367              :      * we have consistent view.
     368              :      */
     369          430 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     370              : 
     371          430 : retry:
     372              :     /* Find unused worker slot. */
     373          758 :     for (i = 0; i < max_logical_replication_workers; i++)
     374              :     {
     375          758 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     376              : 
     377          758 :         if (!w->in_use)
     378              :         {
     379          430 :             worker = w;
     380          430 :             slot = i;
     381          430 :             break;
     382              :         }
     383              :     }
     384              : 
     385          430 :     nsyncworkers = logicalrep_sync_worker_count(subid);
     386              : 
     387          430 :     now = GetCurrentTimestamp();
     388              : 
     389              :     /*
     390              :      * If we didn't find a free slot, try to do garbage collection.  The
     391              :      * reason we do this is because if some worker failed to start up and its
     392              :      * parent has crashed while waiting, the in_use state was never cleared.
     393              :      */
     394          430 :     if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
     395              :     {
     396            0 :         bool        did_cleanup = false;
     397              : 
     398            0 :         for (i = 0; i < max_logical_replication_workers; i++)
     399              :         {
     400            0 :             LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     401              : 
     402              :             /*
     403              :              * If the worker was marked in use but didn't manage to attach in
     404              :              * time, clean it up.
     405              :              */
     406            0 :             if (w->in_use && !w->proc &&
     407            0 :                 TimestampDifferenceExceeds(w->launch_time, now,
     408              :                                            wal_receiver_timeout))
     409              :             {
     410            0 :                 elog(WARNING,
     411              :                      "logical replication worker for subscription %u took too long to start; canceled",
     412              :                      w->subid);
     413              : 
     414            0 :                 logicalrep_worker_cleanup(w);
     415            0 :                 did_cleanup = true;
     416              :             }
     417              :         }
     418              : 
     419            0 :         if (did_cleanup)
     420            0 :             goto retry;
     421              :     }
     422              : 
     423              :     /*
     424              :      * We don't allow to invoke more sync workers once we have reached the
     425              :      * sync worker limit per subscription. So, just return silently as we
     426              :      * might get here because of an otherwise harmless race condition.
     427              :      */
     428          430 :     if ((is_tablesync_worker || is_sequencesync_worker) &&
     429          206 :         nsyncworkers >= max_sync_workers_per_subscription)
     430              :     {
     431            0 :         LWLockRelease(LogicalRepWorkerLock);
     432            0 :         return false;
     433              :     }
     434              : 
     435          430 :     nparallelapplyworkers = logicalrep_pa_worker_count(subid);
     436              : 
     437              :     /*
     438              :      * Return false if the number of parallel apply workers reached the limit
     439              :      * per subscription.
     440              :      */
     441          430 :     if (is_parallel_apply_worker &&
     442           12 :         nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
     443              :     {
     444            0 :         LWLockRelease(LogicalRepWorkerLock);
     445            0 :         return false;
     446              :     }
     447              : 
     448              :     /*
     449              :      * However if there are no more free worker slots, inform user about it
     450              :      * before exiting.
     451              :      */
     452          430 :     if (worker == NULL)
     453              :     {
     454            0 :         LWLockRelease(LogicalRepWorkerLock);
     455            0 :         ereport(WARNING,
     456              :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     457              :                  errmsg("out of logical replication worker slots"),
     458              :                  errhint("You might need to increase \"%s\".", "max_logical_replication_workers")));
     459            0 :         return false;
     460              :     }
     461              : 
     462              :     /* Prepare the worker slot. */
     463          430 :     worker->type = wtype;
     464          430 :     worker->launch_time = now;
     465          430 :     worker->in_use = true;
     466          430 :     worker->generation++;
     467          430 :     worker->proc = NULL;
     468          430 :     worker->dbid = dbid;
     469          430 :     worker->userid = userid;
     470          430 :     worker->subid = subid;
     471          430 :     worker->relid = relid;
     472          430 :     worker->relstate = SUBREL_STATE_UNKNOWN;
     473          430 :     worker->relstate_lsn = InvalidXLogRecPtr;
     474          430 :     worker->stream_fileset = NULL;
     475          430 :     worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
     476          430 :     worker->parallel_apply = is_parallel_apply_worker;
     477          430 :     worker->oldest_nonremovable_xid = retain_dead_tuples
     478            2 :         ? MyReplicationSlot->data.xmin
     479          430 :         : InvalidTransactionId;
     480          430 :     worker->last_lsn = InvalidXLogRecPtr;
     481          430 :     TIMESTAMP_NOBEGIN(worker->last_send_time);
     482          430 :     TIMESTAMP_NOBEGIN(worker->last_recv_time);
     483          430 :     worker->reply_lsn = InvalidXLogRecPtr;
     484          430 :     TIMESTAMP_NOBEGIN(worker->reply_time);
     485          430 :     worker->last_seqsync_start_time = 0;
     486              : 
     487              :     /* Before releasing lock, remember generation for future identification. */
     488          430 :     generation = worker->generation;
     489              : 
     490          430 :     LWLockRelease(LogicalRepWorkerLock);
     491              : 
     492              :     /* Register the new dynamic worker. */
     493          430 :     memset(&bgw, 0, sizeof(bgw));
     494          430 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
     495              :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     496          430 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
     497          430 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
     498              : 
     499          430 :     switch (worker->type)
     500              :     {
     501          212 :         case WORKERTYPE_APPLY:
     502          212 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
     503          212 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     504              :                      "logical replication apply worker for subscription %u",
     505              :                      subid);
     506          212 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
     507          212 :             break;
     508              : 
     509           12 :         case WORKERTYPE_PARALLEL_APPLY:
     510           12 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
     511           12 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     512              :                      "logical replication parallel apply worker for subscription %u",
     513              :                      subid);
     514           12 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
     515              : 
     516           12 :             memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
     517           12 :             break;
     518              : 
     519            9 :         case WORKERTYPE_SEQUENCESYNC:
     520            9 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "SequenceSyncWorkerMain");
     521            9 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     522              :                      "logical replication sequencesync worker for subscription %u",
     523              :                      subid);
     524            9 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication sequencesync worker");
     525            9 :             break;
     526              : 
     527          197 :         case WORKERTYPE_TABLESYNC:
     528          197 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TableSyncWorkerMain");
     529          197 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     530              :                      "logical replication tablesync worker for subscription %u sync %u",
     531              :                      subid,
     532              :                      relid);
     533          197 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
     534          197 :             break;
     535              : 
     536            0 :         case WORKERTYPE_UNKNOWN:
     537              :             /* Should never happen. */
     538            0 :             elog(ERROR, "unknown worker type");
     539              :     }
     540              : 
     541          430 :     bgw.bgw_restart_time = BGW_NEVER_RESTART;
     542          430 :     bgw.bgw_notify_pid = MyProcPid;
     543          430 :     bgw.bgw_main_arg = Int32GetDatum(slot);
     544              : 
     545          430 :     if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
     546              :     {
     547              :         /* Failed to start worker, so clean up the worker slot. */
     548            0 :         LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     549              :         Assert(generation == worker->generation);
     550            0 :         logicalrep_worker_cleanup(worker);
     551            0 :         LWLockRelease(LogicalRepWorkerLock);
     552              : 
     553            0 :         ereport(WARNING,
     554              :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     555              :                  errmsg("out of background worker slots"),
     556              :                  errhint("You might need to increase \"%s\".", "max_worker_processes")));
     557            0 :         return false;
     558              :     }
     559              : 
     560              :     /* Now wait until it attaches. */
     561          430 :     return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
     562              : }
     563              : 
     564              : /*
     565              :  * Internal function to stop the worker and wait until it detaches from the
     566              :  * slot.
     567              :  */
     568              : static void
     569           84 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
     570              : {
     571              :     uint16      generation;
     572              : 
     573              :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
     574              : 
     575              :     /*
     576              :      * Remember which generation was our worker so we can check if what we see
     577              :      * is still the same one.
     578              :      */
     579           84 :     generation = worker->generation;
     580              : 
     581              :     /*
     582              :      * If we found a worker but it does not have proc set then it is still
     583              :      * starting up; wait for it to finish starting and then kill it.
     584              :      */
     585           85 :     while (worker->in_use && !worker->proc)
     586              :     {
     587              :         int         rc;
     588              : 
     589            3 :         LWLockRelease(LogicalRepWorkerLock);
     590              : 
     591              :         /* Wait a bit --- we don't expect to have to wait long. */
     592            3 :         rc = WaitLatch(MyLatch,
     593              :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     594              :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     595              : 
     596            3 :         if (rc & WL_LATCH_SET)
     597              :         {
     598            0 :             ResetLatch(MyLatch);
     599            0 :             CHECK_FOR_INTERRUPTS();
     600              :         }
     601              : 
     602              :         /* Recheck worker status. */
     603            3 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     604              : 
     605              :         /*
     606              :          * Check whether the worker slot is no longer used, which would mean
     607              :          * that the worker has exited, or whether the worker generation is
     608              :          * different, meaning that a different worker has taken the slot.
     609              :          */
     610            3 :         if (!worker->in_use || worker->generation != generation)
     611            0 :             return;
     612              : 
     613              :         /* Worker has assigned proc, so it has started. */
     614            3 :         if (worker->proc)
     615            2 :             break;
     616              :     }
     617              : 
     618              :     /* Now terminate the worker ... */
     619           84 :     kill(worker->proc->pid, signo);
     620              : 
     621              :     /* ... and wait for it to die. */
     622              :     for (;;)
     623          122 :     {
     624              :         int         rc;
     625              : 
     626              :         /* is it gone? */
     627          206 :         if (!worker->proc || worker->generation != generation)
     628              :             break;
     629              : 
     630          122 :         LWLockRelease(LogicalRepWorkerLock);
     631              : 
     632              :         /* Wait a bit --- we don't expect to have to wait long. */
     633          122 :         rc = WaitLatch(MyLatch,
     634              :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     635              :                        10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
     636              : 
     637          122 :         if (rc & WL_LATCH_SET)
     638              :         {
     639           44 :             ResetLatch(MyLatch);
     640           44 :             CHECK_FOR_INTERRUPTS();
     641              :         }
     642              : 
     643          122 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     644              :     }
     645              : }
     646              : 
     647              : /*
     648              :  * Stop the logical replication worker that matches the specified worker type,
     649              :  * subscription id, and relation id.
     650              :  */
     651              : void
     652           96 : logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid)
     653              : {
     654              :     LogicalRepWorker *worker;
     655              : 
     656              :     /* relid must be valid only for table sync workers */
     657              :     Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
     658              : 
     659           96 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     660              : 
     661           96 :     worker = logicalrep_worker_find(wtype, subid, relid, false);
     662              : 
     663           96 :     if (worker)
     664              :     {
     665              :         Assert(!isParallelApplyWorker(worker));
     666           73 :         logicalrep_worker_stop_internal(worker, SIGTERM);
     667              :     }
     668              : 
     669           96 :     LWLockRelease(LogicalRepWorkerLock);
     670           96 : }
     671              : 
     672              : /*
     673              :  * Stop the given logical replication parallel apply worker.
     674              :  *
     675              :  * Node that the function sends SIGUSR2 instead of SIGTERM to the parallel apply
     676              :  * worker so that the worker exits cleanly.
     677              :  */
     678              : void
     679            5 : logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
     680              : {
     681              :     int         slot_no;
     682              :     uint16      generation;
     683              :     LogicalRepWorker *worker;
     684              : 
     685            5 :     SpinLockAcquire(&winfo->shared->mutex);
     686            5 :     generation = winfo->shared->logicalrep_worker_generation;
     687            5 :     slot_no = winfo->shared->logicalrep_worker_slot_no;
     688            5 :     SpinLockRelease(&winfo->shared->mutex);
     689              : 
     690              :     Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
     691              : 
     692              :     /*
     693              :      * Detach from the error_mq_handle for the parallel apply worker before
     694              :      * stopping it. This prevents the leader apply worker from trying to
     695              :      * receive the message from the error queue that might already be detached
     696              :      * by the parallel apply worker.
     697              :      */
     698            5 :     if (winfo->error_mq_handle)
     699              :     {
     700            5 :         shm_mq_detach(winfo->error_mq_handle);
     701            5 :         winfo->error_mq_handle = NULL;
     702              :     }
     703              : 
     704            5 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     705              : 
     706            5 :     worker = &LogicalRepCtx->workers[slot_no];
     707              :     Assert(isParallelApplyWorker(worker));
     708              : 
     709              :     /*
     710              :      * Only stop the worker if the generation matches and the worker is alive.
     711              :      */
     712            5 :     if (worker->generation == generation && worker->proc)
     713            5 :         logicalrep_worker_stop_internal(worker, SIGUSR2);
     714              : 
     715            5 :     LWLockRelease(LogicalRepWorkerLock);
     716            5 : }
     717              : 
     718              : /*
     719              :  * Wake up (using latch) any logical replication worker that matches the
     720              :  * specified worker type, subscription id, and relation id.
     721              :  */
     722              : void
     723          215 : logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid)
     724              : {
     725              :     LogicalRepWorker *worker;
     726              : 
     727              :     /* relid must be valid only for table sync workers */
     728              :     Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
     729              : 
     730          215 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     731              : 
     732          215 :     worker = logicalrep_worker_find(wtype, subid, relid, true);
     733              : 
     734          215 :     if (worker)
     735          215 :         logicalrep_worker_wakeup_ptr(worker);
     736              : 
     737          215 :     LWLockRelease(LogicalRepWorkerLock);
     738          215 : }
     739              : 
     740              : /*
     741              :  * Wake up (using latch) the specified logical replication worker.
     742              :  *
     743              :  * Caller must hold lock, else worker->proc could change under us.
     744              :  */
     745              : void
     746          657 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
     747              : {
     748              :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     749              : 
     750          657 :     SetLatch(&worker->proc->procLatch);
     751          657 : }
     752              : 
     753              : /*
     754              :  * Attach to a slot.
     755              :  */
     756              : void
     757          549 : logicalrep_worker_attach(int slot)
     758              : {
     759              :     /* Block concurrent access. */
     760          549 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     761              : 
     762              :     Assert(slot >= 0 && slot < max_logical_replication_workers);
     763          549 :     MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
     764              : 
     765          549 :     if (!MyLogicalRepWorker->in_use)
     766              :     {
     767            0 :         LWLockRelease(LogicalRepWorkerLock);
     768            0 :         ereport(ERROR,
     769              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     770              :                  errmsg("logical replication worker slot %d is empty, cannot attach",
     771              :                         slot)));
     772              :     }
     773              : 
     774          549 :     if (MyLogicalRepWorker->proc)
     775              :     {
     776            0 :         LWLockRelease(LogicalRepWorkerLock);
     777            0 :         ereport(ERROR,
     778              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     779              :                  errmsg("logical replication worker slot %d is already used by "
     780              :                         "another worker, cannot attach", slot)));
     781              :     }
     782              : 
     783          549 :     MyLogicalRepWorker->proc = MyProc;
     784          549 :     before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
     785              : 
     786          549 :     LWLockRelease(LogicalRepWorkerLock);
     787          549 : }
     788              : 
     789              : /*
     790              :  * Stop the parallel apply workers if any, and detach the leader apply worker
     791              :  * (cleans up the worker info).
     792              :  */
     793              : static void
     794          549 : logicalrep_worker_detach(void)
     795              : {
     796              :     /* Stop the parallel apply workers. */
     797          549 :     if (am_leader_apply_worker())
     798              :     {
     799              :         List       *workers;
     800              :         ListCell   *lc;
     801              : 
     802              :         /*
     803              :          * Detach from the error_mq_handle for all parallel apply workers
     804              :          * before terminating them. This prevents the leader apply worker from
     805              :          * receiving the worker termination message and sending it to logs
     806              :          * when the same is already done by the parallel worker.
     807              :          */
     808          329 :         pa_detach_all_error_mq();
     809              : 
     810          329 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     811              : 
     812          329 :         workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
     813          665 :         foreach(lc, workers)
     814              :         {
     815          336 :             LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
     816              : 
     817          336 :             if (isParallelApplyWorker(w))
     818            6 :                 logicalrep_worker_stop_internal(w, SIGTERM);
     819              :         }
     820              : 
     821          329 :         LWLockRelease(LogicalRepWorkerLock);
     822              : 
     823          329 :         list_free(workers);
     824              :     }
     825              : 
     826              :     /* Block concurrent access. */
     827          549 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     828              : 
     829          549 :     logicalrep_worker_cleanup(MyLogicalRepWorker);
     830              : 
     831          549 :     LWLockRelease(LogicalRepWorkerLock);
     832          549 : }
     833              : 
     834              : /*
     835              :  * Clean up worker info.
     836              :  */
     837              : static void
     838          549 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
     839              : {
     840              :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
     841              : 
     842          549 :     worker->type = WORKERTYPE_UNKNOWN;
     843          549 :     worker->in_use = false;
     844          549 :     worker->proc = NULL;
     845          549 :     worker->dbid = InvalidOid;
     846          549 :     worker->userid = InvalidOid;
     847          549 :     worker->subid = InvalidOid;
     848          549 :     worker->relid = InvalidOid;
     849          549 :     worker->leader_pid = InvalidPid;
     850          549 :     worker->parallel_apply = false;
     851          549 : }
     852              : 
     853              : /*
     854              :  * Cleanup function for logical replication launcher.
     855              :  *
     856              :  * Called on logical replication launcher exit.
     857              :  */
     858              : static void
     859          449 : logicalrep_launcher_onexit(int code, Datum arg)
     860              : {
     861          449 :     LogicalRepCtx->launcher_pid = 0;
     862          449 : }
     863              : 
     864              : /*
     865              :  * Reset the last_seqsync_start_time of the sequencesync worker in the
     866              :  * subscription's apply worker.
     867              :  *
     868              :  * Note that this value is not stored in the sequencesync worker, because that
     869              :  * has finished already and is about to exit.
     870              :  */
     871              : void
     872            5 : logicalrep_reset_seqsync_start_time(void)
     873              : {
     874              :     LogicalRepWorker *worker;
     875              : 
     876              :     /*
     877              :      * The apply worker can't access last_seqsync_start_time concurrently, so
     878              :      * it is okay to use SHARED lock here. See ProcessSequencesForSync().
     879              :      */
     880            5 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     881              : 
     882            5 :     worker = logicalrep_worker_find(WORKERTYPE_APPLY,
     883            5 :                                     MyLogicalRepWorker->subid, InvalidOid,
     884              :                                     true);
     885            5 :     if (worker)
     886            5 :         worker->last_seqsync_start_time = 0;
     887              : 
     888            5 :     LWLockRelease(LogicalRepWorkerLock);
     889            5 : }
     890              : 
     891              : /*
     892              :  * Cleanup function.
     893              :  *
     894              :  * Called on logical replication worker exit.
     895              :  */
     896              : static void
     897          549 : logicalrep_worker_onexit(int code, Datum arg)
     898              : {
     899              :     /* Disconnect gracefully from the remote side. */
     900          549 :     if (LogRepWorkerWalRcvConn)
     901          435 :         walrcv_disconnect(LogRepWorkerWalRcvConn);
     902              : 
     903          549 :     logicalrep_worker_detach();
     904              : 
     905              :     /* Cleanup fileset used for streaming transactions. */
     906          549 :     if (MyLogicalRepWorker->stream_fileset != NULL)
     907           14 :         FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
     908              : 
     909              :     /*
     910              :      * Session level locks may be acquired outside of a transaction in
     911              :      * parallel apply mode and will not be released when the worker
     912              :      * terminates, so manually release all locks before the worker exits.
     913              :      *
     914              :      * The locks will be acquired once the worker is initialized.
     915              :      */
     916          549 :     if (!InitializingApplyWorker)
     917          487 :         LockReleaseAll(DEFAULT_LOCKMETHOD, true);
     918              : 
     919          549 :     ApplyLauncherWakeup();
     920          549 : }
     921              : 
     922              : /*
     923              :  * Count the number of registered (not necessarily running) sync workers
     924              :  * for a subscription.
     925              :  */
     926              : int
     927         1301 : logicalrep_sync_worker_count(Oid subid)
     928              : {
     929              :     int         i;
     930         1301 :     int         res = 0;
     931              : 
     932              :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     933              : 
     934              :     /* Search for attached worker for a given subscription id. */
     935         6707 :     for (i = 0; i < max_logical_replication_workers; i++)
     936              :     {
     937         5406 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     938              : 
     939         5406 :         if (w->subid == subid && (isTableSyncWorker(w) || isSequenceSyncWorker(w)))
     940         1454 :             res++;
     941              :     }
     942              : 
     943         1301 :     return res;
     944              : }
     945              : 
     946              : /*
     947              :  * Count the number of registered (but not necessarily running) parallel apply
     948              :  * workers for a subscription.
     949              :  */
     950              : static int
     951          430 : logicalrep_pa_worker_count(Oid subid)
     952              : {
     953              :     int         i;
     954          430 :     int         res = 0;
     955              : 
     956              :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     957              : 
     958              :     /*
     959              :      * Scan all attached parallel apply workers, only counting those which
     960              :      * have the given subscription id.
     961              :      */
     962         2270 :     for (i = 0; i < max_logical_replication_workers; i++)
     963              :     {
     964         1840 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     965              : 
     966         1840 :         if (isParallelApplyWorker(w) && w->subid == subid)
     967            2 :             res++;
     968              :     }
     969              : 
     970          430 :     return res;
     971              : }
     972              : 
     973              : /*
     974              :  * ApplyLauncherShmemSize
     975              :  *      Compute space needed for replication launcher shared memory
     976              :  */
     977              : Size
     978         4407 : ApplyLauncherShmemSize(void)
     979              : {
     980              :     Size        size;
     981              : 
     982              :     /*
     983              :      * Need the fixed struct and the array of LogicalRepWorker.
     984              :      */
     985         4407 :     size = sizeof(LogicalRepCtxStruct);
     986         4407 :     size = MAXALIGN(size);
     987         4407 :     size = add_size(size, mul_size(max_logical_replication_workers,
     988              :                                    sizeof(LogicalRepWorker)));
     989         4407 :     return size;
     990              : }
     991              : 
     992              : /*
     993              :  * ApplyLauncherRegister
     994              :  *      Register a background worker running the logical replication launcher.
     995              :  */
     996              : void
     997          919 : ApplyLauncherRegister(void)
     998              : {
     999              :     BackgroundWorker bgw;
    1000              : 
    1001              :     /*
    1002              :      * The logical replication launcher is disabled during binary upgrades, to
    1003              :      * prevent logical replication workers from running on the source cluster.
    1004              :      * That could cause replication origins to move forward after having been
    1005              :      * copied to the target cluster, potentially creating conflicts with the
    1006              :      * copied data files.
    1007              :      */
    1008          919 :     if (max_logical_replication_workers == 0 || IsBinaryUpgrade)
    1009           57 :         return;
    1010              : 
    1011          862 :     memset(&bgw, 0, sizeof(bgw));
    1012          862 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
    1013              :         BGWORKER_BACKEND_DATABASE_CONNECTION;
    1014          862 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
    1015          862 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
    1016          862 :     snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
    1017          862 :     snprintf(bgw.bgw_name, BGW_MAXLEN,
    1018              :              "logical replication launcher");
    1019          862 :     snprintf(bgw.bgw_type, BGW_MAXLEN,
    1020              :              "logical replication launcher");
    1021          862 :     bgw.bgw_restart_time = 5;
    1022          862 :     bgw.bgw_notify_pid = 0;
    1023          862 :     bgw.bgw_main_arg = (Datum) 0;
    1024              : 
    1025          862 :     RegisterBackgroundWorker(&bgw);
    1026              : }
    1027              : 
    1028              : /*
    1029              :  * ApplyLauncherShmemInit
    1030              :  *      Allocate and initialize replication launcher shared memory
    1031              :  */
    1032              : void
    1033         1140 : ApplyLauncherShmemInit(void)
    1034              : {
    1035              :     bool        found;
    1036              : 
    1037         1140 :     LogicalRepCtx = (LogicalRepCtxStruct *)
    1038         1140 :         ShmemInitStruct("Logical Replication Launcher Data",
    1039              :                         ApplyLauncherShmemSize(),
    1040              :                         &found);
    1041              : 
    1042         1140 :     if (!found)
    1043              :     {
    1044              :         int         slot;
    1045              : 
    1046         1140 :         memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
    1047              : 
    1048         1140 :         LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
    1049         1140 :         LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
    1050              : 
    1051              :         /* Initialize memory and spin locks for each worker slot. */
    1052         5663 :         for (slot = 0; slot < max_logical_replication_workers; slot++)
    1053              :         {
    1054         4523 :             LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
    1055              : 
    1056         4523 :             memset(worker, 0, sizeof(LogicalRepWorker));
    1057         4523 :             SpinLockInit(&worker->relmutex);
    1058              :         }
    1059              :     }
    1060         1140 : }
    1061              : 
    1062              : /*
    1063              :  * Initialize or attach to the dynamic shared hash table that stores the
    1064              :  * last-start times, if not already done.
    1065              :  * This must be called before accessing the table.
    1066              :  */
    1067              : static void
    1068          771 : logicalrep_launcher_attach_dshmem(void)
    1069              : {
    1070              :     MemoryContext oldcontext;
    1071              : 
    1072              :     /* Quick exit if we already did this. */
    1073          771 :     if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
    1074          714 :         last_start_times != NULL)
    1075          526 :         return;
    1076              : 
    1077              :     /* Otherwise, use a lock to ensure only one process creates the table. */
    1078          245 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
    1079              : 
    1080              :     /* Be sure any local memory allocated by DSA routines is persistent. */
    1081          245 :     oldcontext = MemoryContextSwitchTo(TopMemoryContext);
    1082              : 
    1083          245 :     if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
    1084              :     {
    1085              :         /* Initialize dynamic shared hash table for last-start times. */
    1086           57 :         last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
    1087           57 :         dsa_pin(last_start_times_dsa);
    1088           57 :         dsa_pin_mapping(last_start_times_dsa);
    1089           57 :         last_start_times = dshash_create(last_start_times_dsa, &dsh_params, NULL);
    1090              : 
    1091              :         /* Store handles in shared memory for other backends to use. */
    1092           57 :         LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
    1093           57 :         LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
    1094              :     }
    1095          188 :     else if (!last_start_times)
    1096              :     {
    1097              :         /* Attach to existing dynamic shared hash table. */
    1098          188 :         last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
    1099          188 :         dsa_pin_mapping(last_start_times_dsa);
    1100          188 :         last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
    1101          188 :                                          LogicalRepCtx->last_start_dsh, NULL);
    1102              :     }
    1103              : 
    1104          245 :     MemoryContextSwitchTo(oldcontext);
    1105          245 :     LWLockRelease(LogicalRepWorkerLock);
    1106              : }
    1107              : 
    1108              : /*
    1109              :  * Set the last-start time for the subscription.
    1110              :  */
    1111              : static void
    1112          212 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
    1113              : {
    1114              :     LauncherLastStartTimesEntry *entry;
    1115              :     bool        found;
    1116              : 
    1117          212 :     logicalrep_launcher_attach_dshmem();
    1118              : 
    1119          212 :     entry = dshash_find_or_insert(last_start_times, &subid, &found);
    1120          212 :     entry->last_start_time = start_time;
    1121          212 :     dshash_release_lock(last_start_times, entry);
    1122          212 : }
    1123              : 
    1124              : /*
    1125              :  * Return the last-start time for the subscription, or 0 if there isn't one.
    1126              :  */
    1127              : static TimestampTz
    1128          332 : ApplyLauncherGetWorkerStartTime(Oid subid)
    1129              : {
    1130              :     LauncherLastStartTimesEntry *entry;
    1131              :     TimestampTz ret;
    1132              : 
    1133          332 :     logicalrep_launcher_attach_dshmem();
    1134              : 
    1135          332 :     entry = dshash_find(last_start_times, &subid, false);
    1136          332 :     if (entry == NULL)
    1137          129 :         return 0;
    1138              : 
    1139          203 :     ret = entry->last_start_time;
    1140          203 :     dshash_release_lock(last_start_times, entry);
    1141              : 
    1142          203 :     return ret;
    1143              : }
    1144              : 
    1145              : /*
    1146              :  * Remove the last-start-time entry for the subscription, if one exists.
    1147              :  *
    1148              :  * This has two use-cases: to remove the entry related to a subscription
    1149              :  * that's been deleted or disabled (just to avoid leaking shared memory),
    1150              :  * and to allow immediate restart of an apply worker that has exited
    1151              :  * due to subscription parameter changes.
    1152              :  */
    1153              : void
    1154          227 : ApplyLauncherForgetWorkerStartTime(Oid subid)
    1155              : {
    1156          227 :     logicalrep_launcher_attach_dshmem();
    1157              : 
    1158          227 :     (void) dshash_delete_key(last_start_times, &subid);
    1159          227 : }
    1160              : 
    1161              : /*
    1162              :  * Wakeup the launcher on commit if requested.
    1163              :  */
    1164              : void
    1165       552456 : AtEOXact_ApplyLauncher(bool isCommit)
    1166              : {
    1167       552456 :     if (isCommit)
    1168              :     {
    1169       525754 :         if (on_commit_launcher_wakeup)
    1170          145 :             ApplyLauncherWakeup();
    1171              :     }
    1172              : 
    1173       552456 :     on_commit_launcher_wakeup = false;
    1174       552456 : }
    1175              : 
    1176              : /*
    1177              :  * Request wakeup of the launcher on commit of the transaction.
    1178              :  *
    1179              :  * This is used to send launcher signal to stop sleeping and process the
    1180              :  * subscriptions when current transaction commits. Should be used when new
    1181              :  * tuple was added to the pg_subscription catalog.
    1182              : */
    1183              : void
    1184          146 : ApplyLauncherWakeupAtCommit(void)
    1185              : {
    1186          146 :     if (!on_commit_launcher_wakeup)
    1187          145 :         on_commit_launcher_wakeup = true;
    1188          146 : }
    1189              : 
    1190              : /*
    1191              :  * Wakeup the launcher immediately.
    1192              :  */
    1193              : void
    1194          731 : ApplyLauncherWakeup(void)
    1195              : {
    1196          731 :     if (LogicalRepCtx->launcher_pid != 0)
    1197          704 :         kill(LogicalRepCtx->launcher_pid, SIGUSR1);
    1198          731 : }
    1199              : 
    1200              : /*
    1201              :  * Main loop for the apply launcher process.
    1202              :  */
    1203              : void
    1204          449 : ApplyLauncherMain(Datum main_arg)
    1205              : {
    1206          449 :     ereport(DEBUG1,
    1207              :             (errmsg_internal("logical replication launcher started")));
    1208              : 
    1209          449 :     before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
    1210              : 
    1211              :     Assert(LogicalRepCtx->launcher_pid == 0);
    1212          449 :     LogicalRepCtx->launcher_pid = MyProcPid;
    1213              : 
    1214              :     /* Establish signal handlers. */
    1215          449 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
    1216          449 :     pqsignal(SIGTERM, die);
    1217          449 :     BackgroundWorkerUnblockSignals();
    1218              : 
    1219              :     /*
    1220              :      * Establish connection to nailed catalogs (we only ever access
    1221              :      * pg_subscription).
    1222              :      */
    1223          449 :     BackgroundWorkerInitializeConnection(NULL, NULL, 0);
    1224              : 
    1225              :     /*
    1226              :      * Acquire the conflict detection slot at startup to ensure it can be
    1227              :      * dropped if no longer needed after a restart.
    1228              :      */
    1229          449 :     acquire_conflict_slot_if_exists();
    1230              : 
    1231              :     /* Enter main loop */
    1232              :     for (;;)
    1233         2514 :     {
    1234              :         int         rc;
    1235              :         List       *sublist;
    1236              :         ListCell   *lc;
    1237              :         MemoryContext subctx;
    1238              :         MemoryContext oldctx;
    1239         2963 :         long        wait_time = DEFAULT_NAPTIME_PER_CYCLE;
    1240         2963 :         bool        can_update_xmin = true;
    1241         2963 :         bool        retain_dead_tuples = false;
    1242         2963 :         TransactionId xmin = InvalidTransactionId;
    1243              : 
    1244         2963 :         CHECK_FOR_INTERRUPTS();
    1245              : 
    1246              :         /* Use temporary context to avoid leaking memory across cycles. */
    1247         2962 :         subctx = AllocSetContextCreate(TopMemoryContext,
    1248              :                                        "Logical Replication Launcher sublist",
    1249              :                                        ALLOCSET_DEFAULT_SIZES);
    1250         2962 :         oldctx = MemoryContextSwitchTo(subctx);
    1251              : 
    1252              :         /*
    1253              :          * Start any missing workers for enabled subscriptions.
    1254              :          *
    1255              :          * Also, during the iteration through all subscriptions, we compute
    1256              :          * the minimum XID required to protect deleted tuples for conflict
    1257              :          * detection if one of the subscription enables retain_dead_tuples
    1258              :          * option.
    1259              :          */
    1260         2962 :         sublist = get_subscription_list();
    1261         3885 :         foreach(lc, sublist)
    1262              :         {
    1263          923 :             Subscription *sub = (Subscription *) lfirst(lc);
    1264              :             LogicalRepWorker *w;
    1265              :             TimestampTz last_start;
    1266              :             TimestampTz now;
    1267              :             long        elapsed;
    1268              : 
    1269          923 :             if (sub->retaindeadtuples)
    1270              :             {
    1271           63 :                 retain_dead_tuples = true;
    1272              : 
    1273              :                 /*
    1274              :                  * Create a replication slot to retain information necessary
    1275              :                  * for conflict detection such as dead tuples, commit
    1276              :                  * timestamps, and origins.
    1277              :                  *
    1278              :                  * The slot is created before starting the apply worker to
    1279              :                  * prevent it from unnecessarily maintaining its
    1280              :                  * oldest_nonremovable_xid.
    1281              :                  *
    1282              :                  * The slot is created even for a disabled subscription to
    1283              :                  * ensure that conflict-related information is available when
    1284              :                  * applying remote changes that occurred before the
    1285              :                  * subscription was enabled.
    1286              :                  */
    1287           63 :                 CreateConflictDetectionSlot();
    1288              : 
    1289           63 :                 if (sub->retentionactive)
    1290              :                 {
    1291              :                     /*
    1292              :                      * Can't advance xmin of the slot unless all the
    1293              :                      * subscriptions actively retaining dead tuples are
    1294              :                      * enabled. This is required to ensure that we don't
    1295              :                      * advance the xmin of CONFLICT_DETECTION_SLOT if one of
    1296              :                      * the subscriptions is not enabled. Otherwise, we won't
    1297              :                      * be able to detect conflicts reliably for such a
    1298              :                      * subscription even though it has set the
    1299              :                      * retain_dead_tuples option.
    1300              :                      */
    1301           63 :                     can_update_xmin &= sub->enabled;
    1302              : 
    1303              :                     /*
    1304              :                      * Initialize the slot once the subscription activates
    1305              :                      * retention.
    1306              :                      */
    1307           63 :                     if (!TransactionIdIsValid(MyReplicationSlot->data.xmin))
    1308            0 :                         init_conflict_slot_xmin();
    1309              :                 }
    1310              :             }
    1311              : 
    1312          923 :             if (!sub->enabled)
    1313           43 :                 continue;
    1314              : 
    1315          880 :             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1316          880 :             w = logicalrep_worker_find(WORKERTYPE_APPLY, sub->oid, InvalidOid,
    1317              :                                        false);
    1318              : 
    1319          880 :             if (w != NULL)
    1320              :             {
    1321              :                 /*
    1322              :                  * Compute the minimum xmin required to protect dead tuples
    1323              :                  * required for conflict detection among all running apply
    1324              :                  * workers. This computation is performed while holding
    1325              :                  * LogicalRepWorkerLock to prevent accessing invalid worker
    1326              :                  * data, in scenarios where a worker might exit and reset its
    1327              :                  * state concurrently.
    1328              :                  */
    1329          548 :                 if (sub->retaindeadtuples &&
    1330           58 :                     sub->retentionactive &&
    1331              :                     can_update_xmin)
    1332           58 :                     compute_min_nonremovable_xid(w, &xmin);
    1333              : 
    1334          548 :                 LWLockRelease(LogicalRepWorkerLock);
    1335              : 
    1336              :                 /* worker is running already */
    1337          548 :                 continue;
    1338              :             }
    1339              : 
    1340          332 :             LWLockRelease(LogicalRepWorkerLock);
    1341              : 
    1342              :             /*
    1343              :              * Can't advance xmin of the slot unless all the workers
    1344              :              * corresponding to subscriptions actively retaining dead tuples
    1345              :              * are running, disabling the further computation of the minimum
    1346              :              * nonremovable xid.
    1347              :              */
    1348          332 :             if (sub->retaindeadtuples && sub->retentionactive)
    1349            2 :                 can_update_xmin = false;
    1350              : 
    1351              :             /*
    1352              :              * If the worker is eligible to start now, launch it.  Otherwise,
    1353              :              * adjust wait_time so that we'll wake up as soon as it can be
    1354              :              * started.
    1355              :              *
    1356              :              * Each subscription's apply worker can only be restarted once per
    1357              :              * wal_retrieve_retry_interval, so that errors do not cause us to
    1358              :              * repeatedly restart the worker as fast as possible.  In cases
    1359              :              * where a restart is expected (e.g., subscription parameter
    1360              :              * changes), another process should remove the last-start entry
    1361              :              * for the subscription so that the worker can be restarted
    1362              :              * without waiting for wal_retrieve_retry_interval to elapse.
    1363              :              */
    1364          332 :             last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
    1365          332 :             now = GetCurrentTimestamp();
    1366          332 :             if (last_start == 0 ||
    1367          203 :                 (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
    1368              :             {
    1369          212 :                 ApplyLauncherSetWorkerStartTime(sub->oid, now);
    1370          216 :                 if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
    1371          212 :                                               sub->dbid, sub->oid, sub->name,
    1372              :                                               sub->owner, InvalidOid,
    1373              :                                               DSM_HANDLE_INVALID,
    1374          214 :                                               sub->retaindeadtuples &&
    1375          214 :                                               sub->retentionactive))
    1376              :                 {
    1377              :                     /*
    1378              :                      * We get here either if we failed to launch a worker
    1379              :                      * (perhaps for resource-exhaustion reasons) or if we
    1380              :                      * launched one but it immediately quit.  Either way, it
    1381              :                      * seems appropriate to try again after
    1382              :                      * wal_retrieve_retry_interval.
    1383              :                      */
    1384            4 :                     wait_time = Min(wait_time,
    1385              :                                     wal_retrieve_retry_interval);
    1386              :                 }
    1387              :             }
    1388              :             else
    1389              :             {
    1390          120 :                 wait_time = Min(wait_time,
    1391              :                                 wal_retrieve_retry_interval - elapsed);
    1392              :             }
    1393              :         }
    1394              : 
    1395              :         /*
    1396              :          * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription
    1397              :          * that requires us to retain dead tuples. Otherwise, if required,
    1398              :          * advance the slot's xmin to protect dead tuples required for the
    1399              :          * conflict detection.
    1400              :          *
    1401              :          * Additionally, if all apply workers for subscriptions with
    1402              :          * retain_dead_tuples enabled have requested to stop retention, the
    1403              :          * slot's xmin will be set to InvalidTransactionId allowing the
    1404              :          * removal of dead tuples.
    1405              :          */
    1406         2962 :         if (MyReplicationSlot)
    1407              :         {
    1408           64 :             if (!retain_dead_tuples)
    1409            1 :                 ReplicationSlotDropAcquired();
    1410           63 :             else if (can_update_xmin)
    1411           58 :                 update_conflict_slot_xmin(xmin);
    1412              :         }
    1413              : 
    1414              :         /* Switch back to original memory context. */
    1415         2962 :         MemoryContextSwitchTo(oldctx);
    1416              :         /* Clean the temporary memory. */
    1417         2962 :         MemoryContextDelete(subctx);
    1418              : 
    1419              :         /* Wait for more work. */
    1420         2962 :         rc = WaitLatch(MyLatch,
    1421              :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1422              :                        wait_time,
    1423              :                        WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
    1424              : 
    1425         2959 :         if (rc & WL_LATCH_SET)
    1426              :         {
    1427         2927 :             ResetLatch(MyLatch);
    1428         2927 :             CHECK_FOR_INTERRUPTS();
    1429              :         }
    1430              : 
    1431         2514 :         if (ConfigReloadPending)
    1432              :         {
    1433           38 :             ConfigReloadPending = false;
    1434           38 :             ProcessConfigFile(PGC_SIGHUP);
    1435              :         }
    1436              :     }
    1437              : 
    1438              :     /* Not reachable */
    1439              : }
    1440              : 
    1441              : /*
    1442              :  * Determine the minimum non-removable transaction ID across all apply workers
    1443              :  * for subscriptions that have retain_dead_tuples enabled. Store the result
    1444              :  * in *xmin.
    1445              :  */
    1446              : static void
    1447           58 : compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
    1448              : {
    1449              :     TransactionId nonremovable_xid;
    1450              : 
    1451              :     Assert(worker != NULL);
    1452              : 
    1453              :     /*
    1454              :      * The replication slot for conflict detection must be created before the
    1455              :      * worker starts.
    1456              :      */
    1457              :     Assert(MyReplicationSlot);
    1458              : 
    1459           58 :     SpinLockAcquire(&worker->relmutex);
    1460           58 :     nonremovable_xid = worker->oldest_nonremovable_xid;
    1461           58 :     SpinLockRelease(&worker->relmutex);
    1462              : 
    1463              :     /*
    1464              :      * Return if the apply worker has stopped retention concurrently.
    1465              :      *
    1466              :      * Although this function is invoked only when retentionactive is true,
    1467              :      * the apply worker might stop retention after the launcher fetches the
    1468              :      * retentionactive flag.
    1469              :      */
    1470           58 :     if (!TransactionIdIsValid(nonremovable_xid))
    1471            0 :         return;
    1472              : 
    1473           58 :     if (!TransactionIdIsValid(*xmin) ||
    1474            0 :         TransactionIdPrecedes(nonremovable_xid, *xmin))
    1475           58 :         *xmin = nonremovable_xid;
    1476              : }
    1477              : 
    1478              : /*
    1479              :  * Acquire the replication slot used to retain information for conflict
    1480              :  * detection, if it exists.
    1481              :  *
    1482              :  * Return true if successfully acquired, otherwise return false.
    1483              :  */
    1484              : static bool
    1485          449 : acquire_conflict_slot_if_exists(void)
    1486              : {
    1487          449 :     if (!SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true))
    1488          448 :         return false;
    1489              : 
    1490            1 :     ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false);
    1491            1 :     return true;
    1492              : }
    1493              : 
    1494              : /*
    1495              :  * Update the xmin the replication slot used to retain information required
    1496              :  * for conflict detection.
    1497              :  */
    1498              : static void
    1499           58 : update_conflict_slot_xmin(TransactionId new_xmin)
    1500              : {
    1501              :     Assert(MyReplicationSlot);
    1502              :     Assert(!TransactionIdIsValid(new_xmin) ||
    1503              :            TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
    1504              : 
    1505              :     /* Return if the xmin value of the slot cannot be updated */
    1506           58 :     if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin))
    1507           44 :         return;
    1508              : 
    1509           14 :     SpinLockAcquire(&MyReplicationSlot->mutex);
    1510           14 :     MyReplicationSlot->effective_xmin = new_xmin;
    1511           14 :     MyReplicationSlot->data.xmin = new_xmin;
    1512           14 :     SpinLockRelease(&MyReplicationSlot->mutex);
    1513              : 
    1514           14 :     elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin);
    1515              : 
    1516           14 :     ReplicationSlotMarkDirty();
    1517           14 :     ReplicationSlotsComputeRequiredXmin(false);
    1518              : 
    1519              :     /*
    1520              :      * Like PhysicalConfirmReceivedLocation(), do not save slot information
    1521              :      * each time. This is acceptable because all concurrent transactions on
    1522              :      * the publisher that require the data preceding the slot's xmin should
    1523              :      * have already been applied and flushed on the subscriber before the xmin
    1524              :      * is advanced. So, even if the slot's xmin regresses after a restart, it
    1525              :      * will be advanced again in the next cycle. Therefore, no data required
    1526              :      * for conflict detection will be prematurely removed.
    1527              :      */
    1528           14 :     return;
    1529              : }
    1530              : 
    1531              : /*
    1532              :  * Initialize the xmin for the conflict detection slot.
    1533              :  */
    1534              : static void
    1535            4 : init_conflict_slot_xmin(void)
    1536              : {
    1537              :     TransactionId xmin_horizon;
    1538              : 
    1539              :     /* Replication slot must exist but shouldn't be initialized. */
    1540              :     Assert(MyReplicationSlot &&
    1541              :            !TransactionIdIsValid(MyReplicationSlot->data.xmin));
    1542              : 
    1543            4 :     LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
    1544            4 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
    1545              : 
    1546            4 :     xmin_horizon = GetOldestSafeDecodingTransactionId(false);
    1547              : 
    1548            4 :     SpinLockAcquire(&MyReplicationSlot->mutex);
    1549            4 :     MyReplicationSlot->effective_xmin = xmin_horizon;
    1550            4 :     MyReplicationSlot->data.xmin = xmin_horizon;
    1551            4 :     SpinLockRelease(&MyReplicationSlot->mutex);
    1552              : 
    1553            4 :     ReplicationSlotsComputeRequiredXmin(true);
    1554              : 
    1555            4 :     LWLockRelease(ProcArrayLock);
    1556            4 :     LWLockRelease(ReplicationSlotControlLock);
    1557              : 
    1558              :     /* Write this slot to disk */
    1559            4 :     ReplicationSlotMarkDirty();
    1560            4 :     ReplicationSlotSave();
    1561            4 : }
    1562              : 
    1563              : /*
    1564              :  * Create and acquire the replication slot used to retain information for
    1565              :  * conflict detection, if not yet.
    1566              :  */
    1567              : void
    1568           64 : CreateConflictDetectionSlot(void)
    1569              : {
    1570              :     /* Exit early, if the replication slot is already created and acquired */
    1571           64 :     if (MyReplicationSlot)
    1572           60 :         return;
    1573              : 
    1574            4 :     ereport(LOG,
    1575              :             errmsg("creating replication conflict detection slot"));
    1576              : 
    1577            4 :     ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
    1578              :                           false, false);
    1579              : 
    1580            4 :     init_conflict_slot_xmin();
    1581              : }
    1582              : 
    1583              : /*
    1584              :  * Is current process the logical replication launcher?
    1585              :  */
    1586              : bool
    1587         2546 : IsLogicalLauncher(void)
    1588              : {
    1589         2546 :     return LogicalRepCtx->launcher_pid == MyProcPid;
    1590              : }
    1591              : 
    1592              : /*
    1593              :  * Return the pid of the leader apply worker if the given pid is the pid of a
    1594              :  * parallel apply worker, otherwise, return InvalidPid.
    1595              :  */
    1596              : pid_t
    1597          967 : GetLeaderApplyWorkerPid(pid_t pid)
    1598              : {
    1599          967 :     int         leader_pid = InvalidPid;
    1600              :     int         i;
    1601              : 
    1602          967 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1603              : 
    1604         4835 :     for (i = 0; i < max_logical_replication_workers; i++)
    1605              :     {
    1606         3868 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
    1607              : 
    1608         3868 :         if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
    1609              :         {
    1610            0 :             leader_pid = w->leader_pid;
    1611            0 :             break;
    1612              :         }
    1613              :     }
    1614              : 
    1615          967 :     LWLockRelease(LogicalRepWorkerLock);
    1616              : 
    1617          967 :     return leader_pid;
    1618              : }
    1619              : 
    1620              : /*
    1621              :  * Returns state of the subscriptions.
    1622              :  */
    1623              : Datum
    1624            1 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
    1625              : {
    1626              : #define PG_STAT_GET_SUBSCRIPTION_COLS   10
    1627            1 :     Oid         subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
    1628              :     int         i;
    1629            1 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1630              : 
    1631            1 :     InitMaterializedSRF(fcinfo, 0);
    1632              : 
    1633              :     /* Make sure we get consistent view of the workers. */
    1634            1 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1635              : 
    1636            5 :     for (i = 0; i < max_logical_replication_workers; i++)
    1637              :     {
    1638              :         /* for each row */
    1639            4 :         Datum       values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
    1640            4 :         bool        nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
    1641              :         int         worker_pid;
    1642              :         LogicalRepWorker worker;
    1643              : 
    1644            4 :         memcpy(&worker, &LogicalRepCtx->workers[i],
    1645              :                sizeof(LogicalRepWorker));
    1646            4 :         if (!worker.proc || !IsBackendPid(worker.proc->pid))
    1647            2 :             continue;
    1648              : 
    1649            2 :         if (OidIsValid(subid) && worker.subid != subid)
    1650            0 :             continue;
    1651              : 
    1652            2 :         worker_pid = worker.proc->pid;
    1653              : 
    1654            2 :         values[0] = ObjectIdGetDatum(worker.subid);
    1655            2 :         if (isTableSyncWorker(&worker))
    1656            0 :             values[1] = ObjectIdGetDatum(worker.relid);
    1657              :         else
    1658            2 :             nulls[1] = true;
    1659            2 :         values[2] = Int32GetDatum(worker_pid);
    1660              : 
    1661            2 :         if (isParallelApplyWorker(&worker))
    1662            0 :             values[3] = Int32GetDatum(worker.leader_pid);
    1663              :         else
    1664            2 :             nulls[3] = true;
    1665              : 
    1666            2 :         if (!XLogRecPtrIsValid(worker.last_lsn))
    1667            1 :             nulls[4] = true;
    1668              :         else
    1669            1 :             values[4] = LSNGetDatum(worker.last_lsn);
    1670            2 :         if (worker.last_send_time == 0)
    1671            0 :             nulls[5] = true;
    1672              :         else
    1673            2 :             values[5] = TimestampTzGetDatum(worker.last_send_time);
    1674            2 :         if (worker.last_recv_time == 0)
    1675            0 :             nulls[6] = true;
    1676              :         else
    1677            2 :             values[6] = TimestampTzGetDatum(worker.last_recv_time);
    1678            2 :         if (!XLogRecPtrIsValid(worker.reply_lsn))
    1679            1 :             nulls[7] = true;
    1680              :         else
    1681            1 :             values[7] = LSNGetDatum(worker.reply_lsn);
    1682            2 :         if (worker.reply_time == 0)
    1683            0 :             nulls[8] = true;
    1684              :         else
    1685            2 :             values[8] = TimestampTzGetDatum(worker.reply_time);
    1686              : 
    1687            2 :         switch (worker.type)
    1688              :         {
    1689            2 :             case WORKERTYPE_APPLY:
    1690            2 :                 values[9] = CStringGetTextDatum("apply");
    1691            2 :                 break;
    1692            0 :             case WORKERTYPE_PARALLEL_APPLY:
    1693            0 :                 values[9] = CStringGetTextDatum("parallel apply");
    1694            0 :                 break;
    1695            0 :             case WORKERTYPE_SEQUENCESYNC:
    1696            0 :                 values[9] = CStringGetTextDatum("sequence synchronization");
    1697            0 :                 break;
    1698            0 :             case WORKERTYPE_TABLESYNC:
    1699            0 :                 values[9] = CStringGetTextDatum("table synchronization");
    1700            0 :                 break;
    1701            0 :             case WORKERTYPE_UNKNOWN:
    1702              :                 /* Should never happen. */
    1703            0 :                 elog(ERROR, "unknown worker type");
    1704              :         }
    1705              : 
    1706            2 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
    1707              :                              values, nulls);
    1708              : 
    1709              :         /*
    1710              :          * If only a single subscription was requested, and we found it,
    1711              :          * break.
    1712              :          */
    1713            2 :         if (OidIsValid(subid))
    1714            0 :             break;
    1715              :     }
    1716              : 
    1717            1 :     LWLockRelease(LogicalRepWorkerLock);
    1718              : 
    1719            1 :     return (Datum) 0;
    1720              : }
        

Generated by: LCOV version 2.0-1