LCOV - code coverage report
Current view: top level - src/backend/replication/logical - launcher.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 408 465 87.7 %
Date: 2024-12-12 19:15:15 Functions: 31 31 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * launcher.c
       3             :  *     PostgreSQL logical replication worker launcher process
       4             :  *
       5             :  * Copyright (c) 2016-2024, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/launcher.c
       9             :  *
      10             :  * NOTES
      11             :  *    This module contains the logical replication worker launcher which
      12             :  *    uses the background worker infrastructure to start the logical
      13             :  *    replication workers for every enabled subscription.
      14             :  *
      15             :  *-------------------------------------------------------------------------
      16             :  */
      17             : 
      18             : #include "postgres.h"
      19             : 
      20             : #include "access/heapam.h"
      21             : #include "access/htup.h"
      22             : #include "access/htup_details.h"
      23             : #include "access/tableam.h"
      24             : #include "access/xact.h"
      25             : #include "catalog/pg_subscription.h"
      26             : #include "catalog/pg_subscription_rel.h"
      27             : #include "funcapi.h"
      28             : #include "lib/dshash.h"
      29             : #include "miscadmin.h"
      30             : #include "pgstat.h"
      31             : #include "postmaster/bgworker.h"
      32             : #include "postmaster/interrupt.h"
      33             : #include "replication/logicallauncher.h"
      34             : #include "replication/slot.h"
      35             : #include "replication/walreceiver.h"
      36             : #include "replication/worker_internal.h"
      37             : #include "storage/ipc.h"
      38             : #include "storage/proc.h"
      39             : #include "storage/procarray.h"
      40             : #include "tcop/tcopprot.h"
      41             : #include "utils/builtins.h"
      42             : #include "utils/memutils.h"
      43             : #include "utils/pg_lsn.h"
      44             : #include "utils/snapmgr.h"
      45             : 
      46             : /* max sleep time between cycles (3min) */
      47             : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
      48             : 
      49             : /* GUC variables */
      50             : int         max_logical_replication_workers = 4;
      51             : int         max_sync_workers_per_subscription = 2;
      52             : int         max_parallel_apply_workers_per_subscription = 2;
      53             : 
      54             : LogicalRepWorker *MyLogicalRepWorker = NULL;
      55             : 
      56             : typedef struct LogicalRepCtxStruct
      57             : {
      58             :     /* Supervisor process. */
      59             :     pid_t       launcher_pid;
      60             : 
      61             :     /* Hash table holding last start times of subscriptions' apply workers. */
      62             :     dsa_handle  last_start_dsa;
      63             :     dshash_table_handle last_start_dsh;
      64             : 
      65             :     /* Background workers. */
      66             :     LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
      67             : } LogicalRepCtxStruct;
      68             : 
      69             : static LogicalRepCtxStruct *LogicalRepCtx;
      70             : 
      71             : /* an entry in the last-start-times shared hash table */
      72             : typedef struct LauncherLastStartTimesEntry
      73             : {
      74             :     Oid         subid;          /* OID of logrep subscription (hash key) */
      75             :     TimestampTz last_start_time;    /* last time its apply worker was started */
      76             : } LauncherLastStartTimesEntry;
      77             : 
      78             : /* parameters for the last-start-times shared hash table */
      79             : static const dshash_parameters dsh_params = {
      80             :     sizeof(Oid),
      81             :     sizeof(LauncherLastStartTimesEntry),
      82             :     dshash_memcmp,
      83             :     dshash_memhash,
      84             :     dshash_memcpy,
      85             :     LWTRANCHE_LAUNCHER_HASH
      86             : };
      87             : 
      88             : static dsa_area *last_start_times_dsa = NULL;
      89             : static dshash_table *last_start_times = NULL;
      90             : 
      91             : static bool on_commit_launcher_wakeup = false;
      92             : 
      93             : 
      94             : static void ApplyLauncherWakeup(void);
      95             : static void logicalrep_launcher_onexit(int code, Datum arg);
      96             : static void logicalrep_worker_onexit(int code, Datum arg);
      97             : static void logicalrep_worker_detach(void);
      98             : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
      99             : static int  logicalrep_pa_worker_count(Oid subid);
     100             : static void logicalrep_launcher_attach_dshmem(void);
     101             : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
     102             : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
     103             : 
     104             : 
     105             : /*
     106             :  * Load the list of subscriptions.
     107             :  *
     108             :  * Only the fields interesting for worker start/stop functions are filled for
     109             :  * each subscription.
     110             :  */
     111             : static List *
     112        4474 : get_subscription_list(void)
     113             : {
     114        4474 :     List       *res = NIL;
     115             :     Relation    rel;
     116             :     TableScanDesc scan;
     117             :     HeapTuple   tup;
     118             :     MemoryContext resultcxt;
     119             : 
     120             :     /* This is the context that we will allocate our output data in */
     121        4474 :     resultcxt = CurrentMemoryContext;
     122             : 
     123             :     /*
     124             :      * Start a transaction so we can access pg_database, and get a snapshot.
     125             :      * We don't have a use for the snapshot itself, but we're interested in
     126             :      * the secondary effect that it sets RecentGlobalXmin.  (This is critical
     127             :      * for anything that reads heap pages, because HOT may decide to prune
     128             :      * them even if the process doesn't attempt to modify any tuples.)
     129             :      *
     130             :      * FIXME: This comment is inaccurate / the code buggy. A snapshot that is
     131             :      * not pushed/active does not reliably prevent HOT pruning (->xmin could
     132             :      * e.g. be cleared when cache invalidations are processed).
     133             :      */
     134        4474 :     StartTransactionCommand();
     135        4474 :     (void) GetTransactionSnapshot();
     136             : 
     137        4474 :     rel = table_open(SubscriptionRelationId, AccessShareLock);
     138        4474 :     scan = table_beginscan_catalog(rel, 0, NULL);
     139             : 
     140        5268 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     141             :     {
     142         794 :         Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
     143             :         Subscription *sub;
     144             :         MemoryContext oldcxt;
     145             : 
     146             :         /*
     147             :          * Allocate our results in the caller's context, not the
     148             :          * transaction's. We do this inside the loop, and restore the original
     149             :          * context at the end, so that leaky things like heap_getnext() are
     150             :          * not called in a potentially long-lived context.
     151             :          */
     152         794 :         oldcxt = MemoryContextSwitchTo(resultcxt);
     153             : 
     154         794 :         sub = (Subscription *) palloc0(sizeof(Subscription));
     155         794 :         sub->oid = subform->oid;
     156         794 :         sub->dbid = subform->subdbid;
     157         794 :         sub->owner = subform->subowner;
     158         794 :         sub->enabled = subform->subenabled;
     159         794 :         sub->name = pstrdup(NameStr(subform->subname));
     160             :         /* We don't fill fields we are not interested in. */
     161             : 
     162         794 :         res = lappend(res, sub);
     163         794 :         MemoryContextSwitchTo(oldcxt);
     164             :     }
     165             : 
     166        4474 :     table_endscan(scan);
     167        4474 :     table_close(rel, AccessShareLock);
     168             : 
     169        4474 :     CommitTransactionCommand();
     170             : 
     171        4474 :     return res;
     172             : }
     173             : 
     174             : /*
     175             :  * Wait for a background worker to start up and attach to the shmem context.
     176             :  *
     177             :  * This is only needed for cleaning up the shared memory in case the worker
     178             :  * fails to attach.
     179             :  *
     180             :  * Returns whether the attach was successful.
     181             :  */
     182             : static bool
     183        3310 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
     184             :                                uint16 generation,
     185             :                                BackgroundWorkerHandle *handle)
     186             : {
     187             :     BgwHandleStatus status;
     188             :     int         rc;
     189             : 
     190             :     for (;;)
     191        2654 :     {
     192             :         pid_t       pid;
     193             : 
     194        3310 :         CHECK_FOR_INTERRUPTS();
     195             : 
     196        3310 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     197             : 
     198             :         /* Worker either died or has started. Return false if died. */
     199        3310 :         if (!worker->in_use || worker->proc)
     200             :         {
     201         652 :             LWLockRelease(LogicalRepWorkerLock);
     202         652 :             return worker->in_use;
     203             :         }
     204             : 
     205        2658 :         LWLockRelease(LogicalRepWorkerLock);
     206             : 
     207             :         /* Check if worker has died before attaching, and clean up after it. */
     208        2658 :         status = GetBackgroundWorkerPid(handle, &pid);
     209             : 
     210        2658 :         if (status == BGWH_STOPPED)
     211             :         {
     212           0 :             LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     213             :             /* Ensure that this was indeed the worker we waited for. */
     214           0 :             if (generation == worker->generation)
     215           0 :                 logicalrep_worker_cleanup(worker);
     216           0 :             LWLockRelease(LogicalRepWorkerLock);
     217           0 :             return false;
     218             :         }
     219             : 
     220             :         /*
     221             :          * We need timeout because we generally don't get notified via latch
     222             :          * about the worker attach.  But we don't expect to have to wait long.
     223             :          */
     224        2658 :         rc = WaitLatch(MyLatch,
     225             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     226             :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     227             : 
     228        2658 :         if (rc & WL_LATCH_SET)
     229             :         {
     230         864 :             ResetLatch(MyLatch);
     231         864 :             CHECK_FOR_INTERRUPTS();
     232             :         }
     233             :     }
     234             : }
     235             : 
     236             : /*
     237             :  * Walks the workers array and searches for one that matches given
     238             :  * subscription id and relid.
     239             :  *
     240             :  * We are only interested in the leader apply worker or table sync worker.
     241             :  */
     242             : LogicalRepWorker *
     243        4298 : logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
     244             : {
     245             :     int         i;
     246        4298 :     LogicalRepWorker *res = NULL;
     247             : 
     248             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     249             : 
     250             :     /* Search for attached worker for a given subscription id. */
     251       12988 :     for (i = 0; i < max_logical_replication_workers; i++)
     252             :     {
     253       11396 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     254             : 
     255             :         /* Skip parallel apply workers. */
     256       11396 :         if (isParallelApplyWorker(w))
     257           0 :             continue;
     258             : 
     259       11396 :         if (w->in_use && w->subid == subid && w->relid == relid &&
     260        2706 :             (!only_running || w->proc))
     261             :         {
     262        2706 :             res = w;
     263        2706 :             break;
     264             :         }
     265             :     }
     266             : 
     267        4298 :     return res;
     268             : }
     269             : 
     270             : /*
     271             :  * Similar to logicalrep_worker_find(), but returns a list of all workers for
     272             :  * the subscription, instead of just one.
     273             :  */
     274             : List *
     275         974 : logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
     276             : {
     277             :     int         i;
     278         974 :     List       *res = NIL;
     279             : 
     280         974 :     if (acquire_lock)
     281         200 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     282             : 
     283             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     284             : 
     285             :     /* Search for attached worker for a given subscription id. */
     286        5066 :     for (i = 0; i < max_logical_replication_workers; i++)
     287             :     {
     288        4092 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     289             : 
     290        4092 :         if (w->in_use && w->subid == subid && (!only_running || w->proc))
     291         658 :             res = lappend(res, w);
     292             :     }
     293             : 
     294         974 :     if (acquire_lock)
     295         200 :         LWLockRelease(LogicalRepWorkerLock);
     296             : 
     297         974 :     return res;
     298             : }
     299             : 
     300             : /*
     301             :  * Start new logical replication background worker, if possible.
     302             :  *
     303             :  * Returns true on success, false on failure.
     304             :  */
     305             : bool
     306         656 : logicalrep_worker_launch(LogicalRepWorkerType wtype,
     307             :                          Oid dbid, Oid subid, const char *subname, Oid userid,
     308             :                          Oid relid, dsm_handle subworker_dsm)
     309             : {
     310             :     BackgroundWorker bgw;
     311             :     BackgroundWorkerHandle *bgw_handle;
     312             :     uint16      generation;
     313             :     int         i;
     314         656 :     int         slot = 0;
     315         656 :     LogicalRepWorker *worker = NULL;
     316             :     int         nsyncworkers;
     317             :     int         nparallelapplyworkers;
     318             :     TimestampTz now;
     319         656 :     bool        is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
     320         656 :     bool        is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
     321             : 
     322             :     /*----------
     323             :      * Sanity checks:
     324             :      * - must be valid worker type
     325             :      * - tablesync workers are only ones to have relid
     326             :      * - parallel apply worker is the only kind of subworker
     327             :      */
     328             :     Assert(wtype != WORKERTYPE_UNKNOWN);
     329             :     Assert(is_tablesync_worker == OidIsValid(relid));
     330             :     Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
     331             : 
     332         656 :     ereport(DEBUG1,
     333             :             (errmsg_internal("starting logical replication worker for subscription \"%s\"",
     334             :                              subname)));
     335             : 
     336             :     /* Report this after the initial starting message for consistency. */
     337         656 :     if (max_replication_slots == 0)
     338           0 :         ereport(ERROR,
     339             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     340             :                  errmsg("cannot start logical replication workers when \"max_replication_slots\"=0")));
     341             : 
     342             :     /*
     343             :      * We need to do the modification of the shared memory under lock so that
     344             :      * we have consistent view.
     345             :      */
     346         656 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     347             : 
     348         656 : retry:
     349             :     /* Find unused worker slot. */
     350        1186 :     for (i = 0; i < max_logical_replication_workers; i++)
     351             :     {
     352        1186 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     353             : 
     354        1186 :         if (!w->in_use)
     355             :         {
     356         656 :             worker = w;
     357         656 :             slot = i;
     358         656 :             break;
     359             :         }
     360             :     }
     361             : 
     362         656 :     nsyncworkers = logicalrep_sync_worker_count(subid);
     363             : 
     364         656 :     now = GetCurrentTimestamp();
     365             : 
     366             :     /*
     367             :      * If we didn't find a free slot, try to do garbage collection.  The
     368             :      * reason we do this is because if some worker failed to start up and its
     369             :      * parent has crashed while waiting, the in_use state was never cleared.
     370             :      */
     371         656 :     if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
     372             :     {
     373           0 :         bool        did_cleanup = false;
     374             : 
     375           0 :         for (i = 0; i < max_logical_replication_workers; i++)
     376             :         {
     377           0 :             LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     378             : 
     379             :             /*
     380             :              * If the worker was marked in use but didn't manage to attach in
     381             :              * time, clean it up.
     382             :              */
     383           0 :             if (w->in_use && !w->proc &&
     384           0 :                 TimestampDifferenceExceeds(w->launch_time, now,
     385             :                                            wal_receiver_timeout))
     386             :             {
     387           0 :                 elog(WARNING,
     388             :                      "logical replication worker for subscription %u took too long to start; canceled",
     389             :                      w->subid);
     390             : 
     391           0 :                 logicalrep_worker_cleanup(w);
     392           0 :                 did_cleanup = true;
     393             :             }
     394             :         }
     395             : 
     396           0 :         if (did_cleanup)
     397           0 :             goto retry;
     398             :     }
     399             : 
     400             :     /*
     401             :      * We don't allow to invoke more sync workers once we have reached the
     402             :      * sync worker limit per subscription. So, just return silently as we
     403             :      * might get here because of an otherwise harmless race condition.
     404             :      */
     405         656 :     if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
     406             :     {
     407           0 :         LWLockRelease(LogicalRepWorkerLock);
     408           0 :         return false;
     409             :     }
     410             : 
     411         656 :     nparallelapplyworkers = logicalrep_pa_worker_count(subid);
     412             : 
     413             :     /*
     414             :      * Return false if the number of parallel apply workers reached the limit
     415             :      * per subscription.
     416             :      */
     417         656 :     if (is_parallel_apply_worker &&
     418          20 :         nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
     419             :     {
     420           0 :         LWLockRelease(LogicalRepWorkerLock);
     421           0 :         return false;
     422             :     }
     423             : 
     424             :     /*
     425             :      * However if there are no more free worker slots, inform user about it
     426             :      * before exiting.
     427             :      */
     428         656 :     if (worker == NULL)
     429             :     {
     430           0 :         LWLockRelease(LogicalRepWorkerLock);
     431           0 :         ereport(WARNING,
     432             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     433             :                  errmsg("out of logical replication worker slots"),
     434             :                  errhint("You might need to increase \"%s\".", "max_logical_replication_workers")));
     435           0 :         return false;
     436             :     }
     437             : 
     438             :     /* Prepare the worker slot. */
     439         656 :     worker->type = wtype;
     440         656 :     worker->launch_time = now;
     441         656 :     worker->in_use = true;
     442         656 :     worker->generation++;
     443         656 :     worker->proc = NULL;
     444         656 :     worker->dbid = dbid;
     445         656 :     worker->userid = userid;
     446         656 :     worker->subid = subid;
     447         656 :     worker->relid = relid;
     448         656 :     worker->relstate = SUBREL_STATE_UNKNOWN;
     449         656 :     worker->relstate_lsn = InvalidXLogRecPtr;
     450         656 :     worker->stream_fileset = NULL;
     451         656 :     worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
     452         656 :     worker->parallel_apply = is_parallel_apply_worker;
     453         656 :     worker->last_lsn = InvalidXLogRecPtr;
     454         656 :     TIMESTAMP_NOBEGIN(worker->last_send_time);
     455         656 :     TIMESTAMP_NOBEGIN(worker->last_recv_time);
     456         656 :     worker->reply_lsn = InvalidXLogRecPtr;
     457         656 :     TIMESTAMP_NOBEGIN(worker->reply_time);
     458             : 
     459             :     /* Before releasing lock, remember generation for future identification. */
     460         656 :     generation = worker->generation;
     461             : 
     462         656 :     LWLockRelease(LogicalRepWorkerLock);
     463             : 
     464             :     /* Register the new dynamic worker. */
     465         656 :     memset(&bgw, 0, sizeof(bgw));
     466         656 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
     467             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     468         656 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
     469         656 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
     470             : 
     471         656 :     switch (worker->type)
     472             :     {
     473         270 :         case WORKERTYPE_APPLY:
     474         270 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
     475         270 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     476             :                      "logical replication apply worker for subscription %u",
     477             :                      subid);
     478         270 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
     479         270 :             break;
     480             : 
     481          20 :         case WORKERTYPE_PARALLEL_APPLY:
     482          20 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
     483          20 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     484             :                      "logical replication parallel apply worker for subscription %u",
     485             :                      subid);
     486          20 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
     487             : 
     488          20 :             memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
     489          20 :             break;
     490             : 
     491         366 :         case WORKERTYPE_TABLESYNC:
     492         366 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
     493         366 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     494             :                      "logical replication tablesync worker for subscription %u sync %u",
     495             :                      subid,
     496             :                      relid);
     497         366 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
     498         366 :             break;
     499             : 
     500           0 :         case WORKERTYPE_UNKNOWN:
     501             :             /* Should never happen. */
     502           0 :             elog(ERROR, "unknown worker type");
     503             :     }
     504             : 
     505         656 :     bgw.bgw_restart_time = BGW_NEVER_RESTART;
     506         656 :     bgw.bgw_notify_pid = MyProcPid;
     507         656 :     bgw.bgw_main_arg = Int32GetDatum(slot);
     508             : 
     509         656 :     if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
     510             :     {
     511             :         /* Failed to start worker, so clean up the worker slot. */
     512           0 :         LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     513             :         Assert(generation == worker->generation);
     514           0 :         logicalrep_worker_cleanup(worker);
     515           0 :         LWLockRelease(LogicalRepWorkerLock);
     516             : 
     517           0 :         ereport(WARNING,
     518             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     519             :                  errmsg("out of background worker slots"),
     520             :                  errhint("You might need to increase \"%s\".", "max_worker_processes")));
     521           0 :         return false;
     522             :     }
     523             : 
     524             :     /* Now wait until it attaches. */
     525         656 :     return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
     526             : }
     527             : 
     528             : /*
     529             :  * Internal function to stop the worker and wait until it detaches from the
     530             :  * slot.
     531             :  */
     532             : static void
     533         146 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
     534             : {
     535             :     uint16      generation;
     536             : 
     537             :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
     538             : 
     539             :     /*
     540             :      * Remember which generation was our worker so we can check if what we see
     541             :      * is still the same one.
     542             :      */
     543         146 :     generation = worker->generation;
     544             : 
     545             :     /*
     546             :      * If we found a worker but it does not have proc set then it is still
     547             :      * starting up; wait for it to finish starting and then kill it.
     548             :      */
     549         156 :     while (worker->in_use && !worker->proc)
     550             :     {
     551             :         int         rc;
     552             : 
     553          16 :         LWLockRelease(LogicalRepWorkerLock);
     554             : 
     555             :         /* Wait a bit --- we don't expect to have to wait long. */
     556          16 :         rc = WaitLatch(MyLatch,
     557             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     558             :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     559             : 
     560          16 :         if (rc & WL_LATCH_SET)
     561             :         {
     562           0 :             ResetLatch(MyLatch);
     563           0 :             CHECK_FOR_INTERRUPTS();
     564             :         }
     565             : 
     566             :         /* Recheck worker status. */
     567          16 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     568             : 
     569             :         /*
     570             :          * Check whether the worker slot is no longer used, which would mean
     571             :          * that the worker has exited, or whether the worker generation is
     572             :          * different, meaning that a different worker has taken the slot.
     573             :          */
     574          16 :         if (!worker->in_use || worker->generation != generation)
     575           0 :             return;
     576             : 
     577             :         /* Worker has assigned proc, so it has started. */
     578          16 :         if (worker->proc)
     579           6 :             break;
     580             :     }
     581             : 
     582             :     /* Now terminate the worker ... */
     583         146 :     kill(worker->proc->pid, signo);
     584             : 
     585             :     /* ... and wait for it to die. */
     586             :     for (;;)
     587         180 :     {
     588             :         int         rc;
     589             : 
     590             :         /* is it gone? */
     591         326 :         if (!worker->proc || worker->generation != generation)
     592             :             break;
     593             : 
     594         180 :         LWLockRelease(LogicalRepWorkerLock);
     595             : 
     596             :         /* Wait a bit --- we don't expect to have to wait long. */
     597         180 :         rc = WaitLatch(MyLatch,
     598             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     599             :                        10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
     600             : 
     601         180 :         if (rc & WL_LATCH_SET)
     602             :         {
     603          36 :             ResetLatch(MyLatch);
     604          36 :             CHECK_FOR_INTERRUPTS();
     605             :         }
     606             : 
     607         180 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     608             :     }
     609             : }
     610             : 
     611             : /*
     612             :  * Stop the logical replication worker for subid/relid, if any.
     613             :  */
     614             : void
     615         166 : logicalrep_worker_stop(Oid subid, Oid relid)
     616             : {
     617             :     LogicalRepWorker *worker;
     618             : 
     619         166 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     620             : 
     621         166 :     worker = logicalrep_worker_find(subid, relid, false);
     622             : 
     623         166 :     if (worker)
     624             :     {
     625             :         Assert(!isParallelApplyWorker(worker));
     626         128 :         logicalrep_worker_stop_internal(worker, SIGTERM);
     627             :     }
     628             : 
     629         166 :     LWLockRelease(LogicalRepWorkerLock);
     630         166 : }
     631             : 
     632             : /*
     633             :  * Stop the given logical replication parallel apply worker.
     634             :  *
     635             :  * Node that the function sends SIGINT instead of SIGTERM to the parallel apply
     636             :  * worker so that the worker exits cleanly.
     637             :  */
     638             : void
     639          10 : logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
     640             : {
     641             :     int         slot_no;
     642             :     uint16      generation;
     643             :     LogicalRepWorker *worker;
     644             : 
     645          10 :     SpinLockAcquire(&winfo->shared->mutex);
     646          10 :     generation = winfo->shared->logicalrep_worker_generation;
     647          10 :     slot_no = winfo->shared->logicalrep_worker_slot_no;
     648          10 :     SpinLockRelease(&winfo->shared->mutex);
     649             : 
     650             :     Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
     651             : 
     652             :     /*
     653             :      * Detach from the error_mq_handle for the parallel apply worker before
     654             :      * stopping it. This prevents the leader apply worker from trying to
     655             :      * receive the message from the error queue that might already be detached
     656             :      * by the parallel apply worker.
     657             :      */
     658          10 :     if (winfo->error_mq_handle)
     659             :     {
     660          10 :         shm_mq_detach(winfo->error_mq_handle);
     661          10 :         winfo->error_mq_handle = NULL;
     662             :     }
     663             : 
     664          10 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     665             : 
     666          10 :     worker = &LogicalRepCtx->workers[slot_no];
     667             :     Assert(isParallelApplyWorker(worker));
     668             : 
     669             :     /*
     670             :      * Only stop the worker if the generation matches and the worker is alive.
     671             :      */
     672          10 :     if (worker->generation == generation && worker->proc)
     673          10 :         logicalrep_worker_stop_internal(worker, SIGINT);
     674             : 
     675          10 :     LWLockRelease(LogicalRepWorkerLock);
     676          10 : }
     677             : 
     678             : /*
     679             :  * Wake up (using latch) any logical replication worker for specified sub/rel.
     680             :  */
     681             : void
     682         400 : logicalrep_worker_wakeup(Oid subid, Oid relid)
     683             : {
     684             :     LogicalRepWorker *worker;
     685             : 
     686         400 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     687             : 
     688         400 :     worker = logicalrep_worker_find(subid, relid, true);
     689             : 
     690         400 :     if (worker)
     691         400 :         logicalrep_worker_wakeup_ptr(worker);
     692             : 
     693         400 :     LWLockRelease(LogicalRepWorkerLock);
     694         400 : }
     695             : 
     696             : /*
     697             :  * Wake up (using latch) the specified logical replication worker.
     698             :  *
     699             :  * Caller must hold lock, else worker->proc could change under us.
     700             :  */
     701             : void
     702        1190 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
     703             : {
     704             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     705             : 
     706        1190 :     SetLatch(&worker->proc->procLatch);
     707        1190 : }
     708             : 
     709             : /*
     710             :  * Attach to a slot.
     711             :  */
     712             : void
     713         802 : logicalrep_worker_attach(int slot)
     714             : {
     715             :     /* Block concurrent access. */
     716         802 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     717             : 
     718             :     Assert(slot >= 0 && slot < max_logical_replication_workers);
     719         802 :     MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
     720             : 
     721         802 :     if (!MyLogicalRepWorker->in_use)
     722             :     {
     723           0 :         LWLockRelease(LogicalRepWorkerLock);
     724           0 :         ereport(ERROR,
     725             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     726             :                  errmsg("logical replication worker slot %d is empty, cannot attach",
     727             :                         slot)));
     728             :     }
     729             : 
     730         802 :     if (MyLogicalRepWorker->proc)
     731             :     {
     732           0 :         LWLockRelease(LogicalRepWorkerLock);
     733           0 :         ereport(ERROR,
     734             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     735             :                  errmsg("logical replication worker slot %d is already used by "
     736             :                         "another worker, cannot attach", slot)));
     737             :     }
     738             : 
     739         802 :     MyLogicalRepWorker->proc = MyProc;
     740         802 :     before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
     741             : 
     742         802 :     LWLockRelease(LogicalRepWorkerLock);
     743         802 : }
     744             : 
     745             : /*
     746             :  * Stop the parallel apply workers if any, and detach the leader apply worker
     747             :  * (cleans up the worker info).
     748             :  */
     749             : static void
     750         802 : logicalrep_worker_detach(void)
     751             : {
     752             :     /* Stop the parallel apply workers. */
     753         802 :     if (am_leader_apply_worker())
     754             :     {
     755             :         List       *workers;
     756             :         ListCell   *lc;
     757             : 
     758             :         /*
     759             :          * Detach from the error_mq_handle for all parallel apply workers
     760             :          * before terminating them. This prevents the leader apply worker from
     761             :          * receiving the worker termination message and sending it to logs
     762             :          * when the same is already done by the parallel worker.
     763             :          */
     764         412 :         pa_detach_all_error_mq();
     765             : 
     766         412 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     767             : 
     768         412 :         workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
     769         838 :         foreach(lc, workers)
     770             :         {
     771         426 :             LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
     772             : 
     773         426 :             if (isParallelApplyWorker(w))
     774           8 :                 logicalrep_worker_stop_internal(w, SIGTERM);
     775             :         }
     776             : 
     777         412 :         LWLockRelease(LogicalRepWorkerLock);
     778             :     }
     779             : 
     780             :     /* Block concurrent access. */
     781         802 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     782             : 
     783         802 :     logicalrep_worker_cleanup(MyLogicalRepWorker);
     784             : 
     785         802 :     LWLockRelease(LogicalRepWorkerLock);
     786         802 : }
     787             : 
     788             : /*
     789             :  * Clean up worker info.
     790             :  */
     791             : static void
     792         802 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
     793             : {
     794             :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
     795             : 
     796         802 :     worker->type = WORKERTYPE_UNKNOWN;
     797         802 :     worker->in_use = false;
     798         802 :     worker->proc = NULL;
     799         802 :     worker->dbid = InvalidOid;
     800         802 :     worker->userid = InvalidOid;
     801         802 :     worker->subid = InvalidOid;
     802         802 :     worker->relid = InvalidOid;
     803         802 :     worker->leader_pid = InvalidPid;
     804         802 :     worker->parallel_apply = false;
     805         802 : }
     806             : 
     807             : /*
     808             :  * Cleanup function for logical replication launcher.
     809             :  *
     810             :  * Called on logical replication launcher exit.
     811             :  */
     812             : static void
     813         738 : logicalrep_launcher_onexit(int code, Datum arg)
     814             : {
     815         738 :     LogicalRepCtx->launcher_pid = 0;
     816         738 : }
     817             : 
     818             : /*
     819             :  * Cleanup function.
     820             :  *
     821             :  * Called on logical replication worker exit.
     822             :  */
     823             : static void
     824         802 : logicalrep_worker_onexit(int code, Datum arg)
     825             : {
     826             :     /* Disconnect gracefully from the remote side. */
     827         802 :     if (LogRepWorkerWalRcvConn)
     828         728 :         walrcv_disconnect(LogRepWorkerWalRcvConn);
     829             : 
     830         802 :     logicalrep_worker_detach();
     831             : 
     832             :     /* Cleanup fileset used for streaming transactions. */
     833         802 :     if (MyLogicalRepWorker->stream_fileset != NULL)
     834          28 :         FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
     835             : 
     836             :     /*
     837             :      * Session level locks may be acquired outside of a transaction in
     838             :      * parallel apply mode and will not be released when the worker
     839             :      * terminates, so manually release all locks before the worker exits.
     840             :      *
     841             :      * The locks will be acquired once the worker is initialized.
     842             :      */
     843         802 :     if (!InitializingApplyWorker)
     844         790 :         LockReleaseAll(DEFAULT_LOCKMETHOD, true);
     845             : 
     846         802 :     ApplyLauncherWakeup();
     847         802 : }
     848             : 
     849             : /*
     850             :  * Count the number of registered (not necessarily running) sync workers
     851             :  * for a subscription.
     852             :  */
     853             : int
     854        1892 : logicalrep_sync_worker_count(Oid subid)
     855             : {
     856             :     int         i;
     857        1892 :     int         res = 0;
     858             : 
     859             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     860             : 
     861             :     /* Search for attached worker for a given subscription id. */
     862        9796 :     for (i = 0; i < max_logical_replication_workers; i++)
     863             :     {
     864        7904 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     865             : 
     866        7904 :         if (isTablesyncWorker(w) && w->subid == subid)
     867        1964 :             res++;
     868             :     }
     869             : 
     870        1892 :     return res;
     871             : }
     872             : 
     873             : /*
     874             :  * Count the number of registered (but not necessarily running) parallel apply
     875             :  * workers for a subscription.
     876             :  */
     877             : static int
     878         656 : logicalrep_pa_worker_count(Oid subid)
     879             : {
     880             :     int         i;
     881         656 :     int         res = 0;
     882             : 
     883             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     884             : 
     885             :     /*
     886             :      * Scan all attached parallel apply workers, only counting those which
     887             :      * have the given subscription id.
     888             :      */
     889        3468 :     for (i = 0; i < max_logical_replication_workers; i++)
     890             :     {
     891        2812 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     892             : 
     893        2812 :         if (isParallelApplyWorker(w) && w->subid == subid)
     894           4 :             res++;
     895             :     }
     896             : 
     897         656 :     return res;
     898             : }
     899             : 
     900             : /*
     901             :  * ApplyLauncherShmemSize
     902             :  *      Compute space needed for replication launcher shared memory
     903             :  */
     904             : Size
     905        7362 : ApplyLauncherShmemSize(void)
     906             : {
     907             :     Size        size;
     908             : 
     909             :     /*
     910             :      * Need the fixed struct and the array of LogicalRepWorker.
     911             :      */
     912        7362 :     size = sizeof(LogicalRepCtxStruct);
     913        7362 :     size = MAXALIGN(size);
     914        7362 :     size = add_size(size, mul_size(max_logical_replication_workers,
     915             :                                    sizeof(LogicalRepWorker)));
     916        7362 :     return size;
     917             : }
     918             : 
     919             : /*
     920             :  * ApplyLauncherRegister
     921             :  *      Register a background worker running the logical replication launcher.
     922             :  */
     923             : void
     924        1534 : ApplyLauncherRegister(void)
     925             : {
     926             :     BackgroundWorker bgw;
     927             : 
     928             :     /*
     929             :      * The logical replication launcher is disabled during binary upgrades, to
     930             :      * prevent logical replication workers from running on the source cluster.
     931             :      * That could cause replication origins to move forward after having been
     932             :      * copied to the target cluster, potentially creating conflicts with the
     933             :      * copied data files.
     934             :      */
     935        1534 :     if (max_logical_replication_workers == 0 || IsBinaryUpgrade)
     936          54 :         return;
     937             : 
     938        1480 :     memset(&bgw, 0, sizeof(bgw));
     939        1480 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
     940             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     941        1480 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
     942        1480 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
     943        1480 :     snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
     944        1480 :     snprintf(bgw.bgw_name, BGW_MAXLEN,
     945             :              "logical replication launcher");
     946        1480 :     snprintf(bgw.bgw_type, BGW_MAXLEN,
     947             :              "logical replication launcher");
     948        1480 :     bgw.bgw_restart_time = 5;
     949        1480 :     bgw.bgw_notify_pid = 0;
     950        1480 :     bgw.bgw_main_arg = (Datum) 0;
     951             : 
     952        1480 :     RegisterBackgroundWorker(&bgw);
     953             : }
     954             : 
     955             : /*
     956             :  * ApplyLauncherShmemInit
     957             :  *      Allocate and initialize replication launcher shared memory
     958             :  */
     959             : void
     960        1908 : ApplyLauncherShmemInit(void)
     961             : {
     962             :     bool        found;
     963             : 
     964        1908 :     LogicalRepCtx = (LogicalRepCtxStruct *)
     965        1908 :         ShmemInitStruct("Logical Replication Launcher Data",
     966             :                         ApplyLauncherShmemSize(),
     967             :                         &found);
     968             : 
     969        1908 :     if (!found)
     970             :     {
     971             :         int         slot;
     972             : 
     973        1908 :         memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
     974             : 
     975        1908 :         LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
     976        1908 :         LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
     977             : 
     978             :         /* Initialize memory and spin locks for each worker slot. */
     979        9482 :         for (slot = 0; slot < max_logical_replication_workers; slot++)
     980             :         {
     981        7574 :             LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
     982             : 
     983        7574 :             memset(worker, 0, sizeof(LogicalRepWorker));
     984        7574 :             SpinLockInit(&worker->relmutex);
     985             :         }
     986             :     }
     987        1908 : }
     988             : 
     989             : /*
     990             :  * Initialize or attach to the dynamic shared hash table that stores the
     991             :  * last-start times, if not already done.
     992             :  * This must be called before accessing the table.
     993             :  */
     994             : static void
     995         868 : logicalrep_launcher_attach_dshmem(void)
     996             : {
     997             :     MemoryContext oldcontext;
     998             : 
     999             :     /* Quick exit if we already did this. */
    1000         868 :     if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
    1001         770 :         last_start_times != NULL)
    1002         548 :         return;
    1003             : 
    1004             :     /* Otherwise, use a lock to ensure only one process creates the table. */
    1005         320 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
    1006             : 
    1007             :     /* Be sure any local memory allocated by DSA routines is persistent. */
    1008         320 :     oldcontext = MemoryContextSwitchTo(TopMemoryContext);
    1009             : 
    1010         320 :     if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
    1011             :     {
    1012             :         /* Initialize dynamic shared hash table for last-start times. */
    1013          98 :         last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
    1014          98 :         dsa_pin(last_start_times_dsa);
    1015          98 :         dsa_pin_mapping(last_start_times_dsa);
    1016          98 :         last_start_times = dshash_create(last_start_times_dsa, &dsh_params, NULL);
    1017             : 
    1018             :         /* Store handles in shared memory for other backends to use. */
    1019          98 :         LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
    1020          98 :         LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
    1021             :     }
    1022         222 :     else if (!last_start_times)
    1023             :     {
    1024             :         /* Attach to existing dynamic shared hash table. */
    1025         222 :         last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
    1026         222 :         dsa_pin_mapping(last_start_times_dsa);
    1027         222 :         last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
    1028         222 :                                          LogicalRepCtx->last_start_dsh, 0);
    1029             :     }
    1030             : 
    1031         320 :     MemoryContextSwitchTo(oldcontext);
    1032         320 :     LWLockRelease(LogicalRepWorkerLock);
    1033             : }
    1034             : 
    1035             : /*
    1036             :  * Set the last-start time for the subscription.
    1037             :  */
    1038             : static void
    1039         270 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
    1040             : {
    1041             :     LauncherLastStartTimesEntry *entry;
    1042             :     bool        found;
    1043             : 
    1044         270 :     logicalrep_launcher_attach_dshmem();
    1045             : 
    1046         270 :     entry = dshash_find_or_insert(last_start_times, &subid, &found);
    1047         270 :     entry->last_start_time = start_time;
    1048         270 :     dshash_release_lock(last_start_times, entry);
    1049         270 : }
    1050             : 
    1051             : /*
    1052             :  * Return the last-start time for the subscription, or 0 if there isn't one.
    1053             :  */
    1054             : static TimestampTz
    1055         316 : ApplyLauncherGetWorkerStartTime(Oid subid)
    1056             : {
    1057             :     LauncherLastStartTimesEntry *entry;
    1058             :     TimestampTz ret;
    1059             : 
    1060         316 :     logicalrep_launcher_attach_dshmem();
    1061             : 
    1062         316 :     entry = dshash_find(last_start_times, &subid, false);
    1063         316 :     if (entry == NULL)
    1064         230 :         return 0;
    1065             : 
    1066          86 :     ret = entry->last_start_time;
    1067          86 :     dshash_release_lock(last_start_times, entry);
    1068             : 
    1069          86 :     return ret;
    1070             : }
    1071             : 
    1072             : /*
    1073             :  * Remove the last-start-time entry for the subscription, if one exists.
    1074             :  *
    1075             :  * This has two use-cases: to remove the entry related to a subscription
    1076             :  * that's been deleted or disabled (just to avoid leaking shared memory),
    1077             :  * and to allow immediate restart of an apply worker that has exited
    1078             :  * due to subscription parameter changes.
    1079             :  */
    1080             : void
    1081         282 : ApplyLauncherForgetWorkerStartTime(Oid subid)
    1082             : {
    1083         282 :     logicalrep_launcher_attach_dshmem();
    1084             : 
    1085         282 :     (void) dshash_delete_key(last_start_times, &subid);
    1086         282 : }
    1087             : 
    1088             : /*
    1089             :  * Wakeup the launcher on commit if requested.
    1090             :  */
    1091             : void
    1092      739538 : AtEOXact_ApplyLauncher(bool isCommit)
    1093             : {
    1094      739538 :     if (isCommit)
    1095             :     {
    1096      691834 :         if (on_commit_launcher_wakeup)
    1097         254 :             ApplyLauncherWakeup();
    1098             :     }
    1099             : 
    1100      739538 :     on_commit_launcher_wakeup = false;
    1101      739538 : }
    1102             : 
    1103             : /*
    1104             :  * Request wakeup of the launcher on commit of the transaction.
    1105             :  *
    1106             :  * This is used to send launcher signal to stop sleeping and process the
    1107             :  * subscriptions when current transaction commits. Should be used when new
    1108             :  * tuple was added to the pg_subscription catalog.
    1109             : */
    1110             : void
    1111         254 : ApplyLauncherWakeupAtCommit(void)
    1112             : {
    1113         254 :     if (!on_commit_launcher_wakeup)
    1114         254 :         on_commit_launcher_wakeup = true;
    1115         254 : }
    1116             : 
    1117             : static void
    1118        1056 : ApplyLauncherWakeup(void)
    1119             : {
    1120        1056 :     if (LogicalRepCtx->launcher_pid != 0)
    1121         990 :         kill(LogicalRepCtx->launcher_pid, SIGUSR1);
    1122        1056 : }
    1123             : 
    1124             : /*
    1125             :  * Main loop for the apply launcher process.
    1126             :  */
    1127             : void
    1128         738 : ApplyLauncherMain(Datum main_arg)
    1129             : {
    1130         738 :     ereport(DEBUG1,
    1131             :             (errmsg_internal("logical replication launcher started")));
    1132             : 
    1133         738 :     before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
    1134             : 
    1135             :     Assert(LogicalRepCtx->launcher_pid == 0);
    1136         738 :     LogicalRepCtx->launcher_pid = MyProcPid;
    1137             : 
    1138             :     /* Establish signal handlers. */
    1139         738 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
    1140         738 :     pqsignal(SIGTERM, die);
    1141         738 :     BackgroundWorkerUnblockSignals();
    1142             : 
    1143             :     /*
    1144             :      * Establish connection to nailed catalogs (we only ever access
    1145             :      * pg_subscription).
    1146             :      */
    1147         738 :     BackgroundWorkerInitializeConnection(NULL, NULL, 0);
    1148             : 
    1149             :     /* Enter main loop */
    1150             :     for (;;)
    1151        3768 :     {
    1152             :         int         rc;
    1153             :         List       *sublist;
    1154             :         ListCell   *lc;
    1155             :         MemoryContext subctx;
    1156             :         MemoryContext oldctx;
    1157        4506 :         long        wait_time = DEFAULT_NAPTIME_PER_CYCLE;
    1158             : 
    1159        4506 :         CHECK_FOR_INTERRUPTS();
    1160             : 
    1161             :         /* Use temporary context to avoid leaking memory across cycles. */
    1162        4474 :         subctx = AllocSetContextCreate(TopMemoryContext,
    1163             :                                        "Logical Replication Launcher sublist",
    1164             :                                        ALLOCSET_DEFAULT_SIZES);
    1165        4474 :         oldctx = MemoryContextSwitchTo(subctx);
    1166             : 
    1167             :         /* Start any missing workers for enabled subscriptions. */
    1168        4474 :         sublist = get_subscription_list();
    1169        5264 :         foreach(lc, sublist)
    1170             :         {
    1171         794 :             Subscription *sub = (Subscription *) lfirst(lc);
    1172             :             LogicalRepWorker *w;
    1173             :             TimestampTz last_start;
    1174             :             TimestampTz now;
    1175             :             long        elapsed;
    1176             : 
    1177         794 :             if (!sub->enabled)
    1178          72 :                 continue;
    1179             : 
    1180         722 :             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1181         722 :             w = logicalrep_worker_find(sub->oid, InvalidOid, false);
    1182         722 :             LWLockRelease(LogicalRepWorkerLock);
    1183             : 
    1184         722 :             if (w != NULL)
    1185         406 :                 continue;       /* worker is running already */
    1186             : 
    1187             :             /*
    1188             :              * If the worker is eligible to start now, launch it.  Otherwise,
    1189             :              * adjust wait_time so that we'll wake up as soon as it can be
    1190             :              * started.
    1191             :              *
    1192             :              * Each subscription's apply worker can only be restarted once per
    1193             :              * wal_retrieve_retry_interval, so that errors do not cause us to
    1194             :              * repeatedly restart the worker as fast as possible.  In cases
    1195             :              * where a restart is expected (e.g., subscription parameter
    1196             :              * changes), another process should remove the last-start entry
    1197             :              * for the subscription so that the worker can be restarted
    1198             :              * without waiting for wal_retrieve_retry_interval to elapse.
    1199             :              */
    1200         316 :             last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
    1201         316 :             now = GetCurrentTimestamp();
    1202         316 :             if (last_start == 0 ||
    1203          86 :                 (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
    1204             :             {
    1205         270 :                 ApplyLauncherSetWorkerStartTime(sub->oid, now);
    1206         270 :                 logicalrep_worker_launch(WORKERTYPE_APPLY,
    1207         270 :                                          sub->dbid, sub->oid, sub->name,
    1208             :                                          sub->owner, InvalidOid,
    1209             :                                          DSM_HANDLE_INVALID);
    1210             :             }
    1211             :             else
    1212             :             {
    1213          46 :                 wait_time = Min(wait_time,
    1214             :                                 wal_retrieve_retry_interval - elapsed);
    1215             :             }
    1216             :         }
    1217             : 
    1218             :         /* Switch back to original memory context. */
    1219        4470 :         MemoryContextSwitchTo(oldctx);
    1220             :         /* Clean the temporary memory. */
    1221        4470 :         MemoryContextDelete(subctx);
    1222             : 
    1223             :         /* Wait for more work. */
    1224        4470 :         rc = WaitLatch(MyLatch,
    1225             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1226             :                        wait_time,
    1227             :                        WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
    1228             : 
    1229        4466 :         if (rc & WL_LATCH_SET)
    1230             :         {
    1231        4450 :             ResetLatch(MyLatch);
    1232        4450 :             CHECK_FOR_INTERRUPTS();
    1233             :         }
    1234             : 
    1235        3768 :         if (ConfigReloadPending)
    1236             :         {
    1237          58 :             ConfigReloadPending = false;
    1238          58 :             ProcessConfigFile(PGC_SIGHUP);
    1239             :         }
    1240             :     }
    1241             : 
    1242             :     /* Not reachable */
    1243             : }
    1244             : 
    1245             : /*
    1246             :  * Is current process the logical replication launcher?
    1247             :  */
    1248             : bool
    1249         772 : IsLogicalLauncher(void)
    1250             : {
    1251         772 :     return LogicalRepCtx->launcher_pid == MyProcPid;
    1252             : }
    1253             : 
    1254             : /*
    1255             :  * Return the pid of the leader apply worker if the given pid is the pid of a
    1256             :  * parallel apply worker, otherwise, return InvalidPid.
    1257             :  */
    1258             : pid_t
    1259        1472 : GetLeaderApplyWorkerPid(pid_t pid)
    1260             : {
    1261        1472 :     int         leader_pid = InvalidPid;
    1262             :     int         i;
    1263             : 
    1264        1472 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1265             : 
    1266        7360 :     for (i = 0; i < max_logical_replication_workers; i++)
    1267             :     {
    1268        5888 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
    1269             : 
    1270        5888 :         if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
    1271             :         {
    1272           0 :             leader_pid = w->leader_pid;
    1273           0 :             break;
    1274             :         }
    1275             :     }
    1276             : 
    1277        1472 :     LWLockRelease(LogicalRepWorkerLock);
    1278             : 
    1279        1472 :     return leader_pid;
    1280             : }
    1281             : 
    1282             : /*
    1283             :  * Returns state of the subscriptions.
    1284             :  */
    1285             : Datum
    1286           2 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
    1287             : {
    1288             : #define PG_STAT_GET_SUBSCRIPTION_COLS   10
    1289           2 :     Oid         subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
    1290             :     int         i;
    1291           2 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1292             : 
    1293           2 :     InitMaterializedSRF(fcinfo, 0);
    1294             : 
    1295             :     /* Make sure we get consistent view of the workers. */
    1296           2 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1297             : 
    1298          10 :     for (i = 0; i < max_logical_replication_workers; i++)
    1299             :     {
    1300             :         /* for each row */
    1301           8 :         Datum       values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
    1302           8 :         bool        nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
    1303             :         int         worker_pid;
    1304             :         LogicalRepWorker worker;
    1305             : 
    1306           8 :         memcpy(&worker, &LogicalRepCtx->workers[i],
    1307             :                sizeof(LogicalRepWorker));
    1308           8 :         if (!worker.proc || !IsBackendPid(worker.proc->pid))
    1309           4 :             continue;
    1310             : 
    1311           4 :         if (OidIsValid(subid) && worker.subid != subid)
    1312           0 :             continue;
    1313             : 
    1314           4 :         worker_pid = worker.proc->pid;
    1315             : 
    1316           4 :         values[0] = ObjectIdGetDatum(worker.subid);
    1317           4 :         if (isTablesyncWorker(&worker))
    1318           0 :             values[1] = ObjectIdGetDatum(worker.relid);
    1319             :         else
    1320           4 :             nulls[1] = true;
    1321           4 :         values[2] = Int32GetDatum(worker_pid);
    1322             : 
    1323           4 :         if (isParallelApplyWorker(&worker))
    1324           0 :             values[3] = Int32GetDatum(worker.leader_pid);
    1325             :         else
    1326           4 :             nulls[3] = true;
    1327             : 
    1328           4 :         if (XLogRecPtrIsInvalid(worker.last_lsn))
    1329           0 :             nulls[4] = true;
    1330             :         else
    1331           4 :             values[4] = LSNGetDatum(worker.last_lsn);
    1332           4 :         if (worker.last_send_time == 0)
    1333           0 :             nulls[5] = true;
    1334             :         else
    1335           4 :             values[5] = TimestampTzGetDatum(worker.last_send_time);
    1336           4 :         if (worker.last_recv_time == 0)
    1337           0 :             nulls[6] = true;
    1338             :         else
    1339           4 :             values[6] = TimestampTzGetDatum(worker.last_recv_time);
    1340           4 :         if (XLogRecPtrIsInvalid(worker.reply_lsn))
    1341           0 :             nulls[7] = true;
    1342             :         else
    1343           4 :             values[7] = LSNGetDatum(worker.reply_lsn);
    1344           4 :         if (worker.reply_time == 0)
    1345           0 :             nulls[8] = true;
    1346             :         else
    1347           4 :             values[8] = TimestampTzGetDatum(worker.reply_time);
    1348             : 
    1349           4 :         switch (worker.type)
    1350             :         {
    1351           4 :             case WORKERTYPE_APPLY:
    1352           4 :                 values[9] = CStringGetTextDatum("apply");
    1353           4 :                 break;
    1354           0 :             case WORKERTYPE_PARALLEL_APPLY:
    1355           0 :                 values[9] = CStringGetTextDatum("parallel apply");
    1356           0 :                 break;
    1357           0 :             case WORKERTYPE_TABLESYNC:
    1358           0 :                 values[9] = CStringGetTextDatum("table synchronization");
    1359           0 :                 break;
    1360           0 :             case WORKERTYPE_UNKNOWN:
    1361             :                 /* Should never happen. */
    1362           0 :                 elog(ERROR, "unknown worker type");
    1363             :         }
    1364             : 
    1365           4 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
    1366             :                              values, nulls);
    1367             : 
    1368             :         /*
    1369             :          * If only a single subscription was requested, and we found it,
    1370             :          * break.
    1371             :          */
    1372           4 :         if (OidIsValid(subid))
    1373           0 :             break;
    1374             :     }
    1375             : 
    1376           2 :     LWLockRelease(LogicalRepWorkerLock);
    1377             : 
    1378           2 :     return (Datum) 0;
    1379             : }

Generated by: LCOV version 1.14