LCOV - code coverage report
Current view: top level - src/backend/replication/logical - launcher.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 410 465 88.2 %
Date: 2024-11-21 08:14:44 Functions: 31 31 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * launcher.c
       3             :  *     PostgreSQL logical replication worker launcher process
       4             :  *
       5             :  * Copyright (c) 2016-2024, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/launcher.c
       9             :  *
      10             :  * NOTES
      11             :  *    This module contains the logical replication worker launcher which
      12             :  *    uses the background worker infrastructure to start the logical
      13             :  *    replication workers for every enabled subscription.
      14             :  *
      15             :  *-------------------------------------------------------------------------
      16             :  */
      17             : 
      18             : #include "postgres.h"
      19             : 
      20             : #include "access/heapam.h"
      21             : #include "access/htup.h"
      22             : #include "access/htup_details.h"
      23             : #include "access/tableam.h"
      24             : #include "access/xact.h"
      25             : #include "catalog/pg_subscription.h"
      26             : #include "catalog/pg_subscription_rel.h"
      27             : #include "funcapi.h"
      28             : #include "lib/dshash.h"
      29             : #include "miscadmin.h"
      30             : #include "pgstat.h"
      31             : #include "postmaster/bgworker.h"
      32             : #include "postmaster/interrupt.h"
      33             : #include "replication/logicallauncher.h"
      34             : #include "replication/slot.h"
      35             : #include "replication/walreceiver.h"
      36             : #include "replication/worker_internal.h"
      37             : #include "storage/ipc.h"
      38             : #include "storage/proc.h"
      39             : #include "storage/procarray.h"
      40             : #include "tcop/tcopprot.h"
      41             : #include "utils/builtins.h"
      42             : #include "utils/memutils.h"
      43             : #include "utils/pg_lsn.h"
      44             : #include "utils/snapmgr.h"
      45             : 
      46             : /* max sleep time between cycles (3min) */
      47             : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
      48             : 
      49             : /* GUC variables */
      50             : int         max_logical_replication_workers = 4;
      51             : int         max_sync_workers_per_subscription = 2;
      52             : int         max_parallel_apply_workers_per_subscription = 2;
      53             : 
      54             : LogicalRepWorker *MyLogicalRepWorker = NULL;
      55             : 
      56             : typedef struct LogicalRepCtxStruct
      57             : {
      58             :     /* Supervisor process. */
      59             :     pid_t       launcher_pid;
      60             : 
      61             :     /* Hash table holding last start times of subscriptions' apply workers. */
      62             :     dsa_handle  last_start_dsa;
      63             :     dshash_table_handle last_start_dsh;
      64             : 
      65             :     /* Background workers. */
      66             :     LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
      67             : } LogicalRepCtxStruct;
      68             : 
      69             : static LogicalRepCtxStruct *LogicalRepCtx;
      70             : 
      71             : /* an entry in the last-start-times shared hash table */
      72             : typedef struct LauncherLastStartTimesEntry
      73             : {
      74             :     Oid         subid;          /* OID of logrep subscription (hash key) */
      75             :     TimestampTz last_start_time;    /* last time its apply worker was started */
      76             : } LauncherLastStartTimesEntry;
      77             : 
      78             : /* parameters for the last-start-times shared hash table */
      79             : static const dshash_parameters dsh_params = {
      80             :     sizeof(Oid),
      81             :     sizeof(LauncherLastStartTimesEntry),
      82             :     dshash_memcmp,
      83             :     dshash_memhash,
      84             :     dshash_memcpy,
      85             :     LWTRANCHE_LAUNCHER_HASH
      86             : };
      87             : 
      88             : static dsa_area *last_start_times_dsa = NULL;
      89             : static dshash_table *last_start_times = NULL;
      90             : 
      91             : static bool on_commit_launcher_wakeup = false;
      92             : 
      93             : 
      94             : static void ApplyLauncherWakeup(void);
      95             : static void logicalrep_launcher_onexit(int code, Datum arg);
      96             : static void logicalrep_worker_onexit(int code, Datum arg);
      97             : static void logicalrep_worker_detach(void);
      98             : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
      99             : static int  logicalrep_pa_worker_count(Oid subid);
     100             : static void logicalrep_launcher_attach_dshmem(void);
     101             : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
     102             : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
     103             : 
     104             : 
     105             : /*
     106             :  * Load the list of subscriptions.
     107             :  *
     108             :  * Only the fields interesting for worker start/stop functions are filled for
     109             :  * each subscription.
     110             :  */
     111             : static List *
     112        4422 : get_subscription_list(void)
     113             : {
     114        4422 :     List       *res = NIL;
     115             :     Relation    rel;
     116             :     TableScanDesc scan;
     117             :     HeapTuple   tup;
     118             :     MemoryContext resultcxt;
     119             : 
     120             :     /* This is the context that we will allocate our output data in */
     121        4422 :     resultcxt = CurrentMemoryContext;
     122             : 
     123             :     /*
     124             :      * Start a transaction so we can access pg_database, and get a snapshot.
     125             :      * We don't have a use for the snapshot itself, but we're interested in
     126             :      * the secondary effect that it sets RecentGlobalXmin.  (This is critical
     127             :      * for anything that reads heap pages, because HOT may decide to prune
     128             :      * them even if the process doesn't attempt to modify any tuples.)
     129             :      *
     130             :      * FIXME: This comment is inaccurate / the code buggy. A snapshot that is
     131             :      * not pushed/active does not reliably prevent HOT pruning (->xmin could
     132             :      * e.g. be cleared when cache invalidations are processed).
     133             :      */
     134        4422 :     StartTransactionCommand();
     135        4422 :     (void) GetTransactionSnapshot();
     136             : 
     137        4422 :     rel = table_open(SubscriptionRelationId, AccessShareLock);
     138        4422 :     scan = table_beginscan_catalog(rel, 0, NULL);
     139             : 
     140        5214 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     141             :     {
     142         792 :         Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
     143             :         Subscription *sub;
     144             :         MemoryContext oldcxt;
     145             : 
     146             :         /*
     147             :          * Allocate our results in the caller's context, not the
     148             :          * transaction's. We do this inside the loop, and restore the original
     149             :          * context at the end, so that leaky things like heap_getnext() are
     150             :          * not called in a potentially long-lived context.
     151             :          */
     152         792 :         oldcxt = MemoryContextSwitchTo(resultcxt);
     153             : 
     154         792 :         sub = (Subscription *) palloc0(sizeof(Subscription));
     155         792 :         sub->oid = subform->oid;
     156         792 :         sub->dbid = subform->subdbid;
     157         792 :         sub->owner = subform->subowner;
     158         792 :         sub->enabled = subform->subenabled;
     159         792 :         sub->name = pstrdup(NameStr(subform->subname));
     160             :         /* We don't fill fields we are not interested in. */
     161             : 
     162         792 :         res = lappend(res, sub);
     163         792 :         MemoryContextSwitchTo(oldcxt);
     164             :     }
     165             : 
     166        4422 :     table_endscan(scan);
     167        4422 :     table_close(rel, AccessShareLock);
     168             : 
     169        4422 :     CommitTransactionCommand();
     170             : 
     171        4422 :     return res;
     172             : }
     173             : 
     174             : /*
     175             :  * Wait for a background worker to start up and attach to the shmem context.
     176             :  *
     177             :  * This is only needed for cleaning up the shared memory in case the worker
     178             :  * fails to attach.
     179             :  *
     180             :  * Returns whether the attach was successful.
     181             :  */
     182             : static bool
     183        3370 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
     184             :                                uint16 generation,
     185             :                                BackgroundWorkerHandle *handle)
     186             : {
     187             :     BgwHandleStatus status;
     188             :     int         rc;
     189             : 
     190             :     for (;;)
     191        2740 :     {
     192             :         pid_t       pid;
     193             : 
     194        3370 :         CHECK_FOR_INTERRUPTS();
     195             : 
     196        3370 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     197             : 
     198             :         /* Worker either died or has started. Return false if died. */
     199        3370 :         if (!worker->in_use || worker->proc)
     200             :         {
     201         626 :             LWLockRelease(LogicalRepWorkerLock);
     202         626 :             return worker->in_use;
     203             :         }
     204             : 
     205        2744 :         LWLockRelease(LogicalRepWorkerLock);
     206             : 
     207             :         /* Check if worker has died before attaching, and clean up after it. */
     208        2744 :         status = GetBackgroundWorkerPid(handle, &pid);
     209             : 
     210        2744 :         if (status == BGWH_STOPPED)
     211             :         {
     212           0 :             LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     213             :             /* Ensure that this was indeed the worker we waited for. */
     214           0 :             if (generation == worker->generation)
     215           0 :                 logicalrep_worker_cleanup(worker);
     216           0 :             LWLockRelease(LogicalRepWorkerLock);
     217           0 :             return false;
     218             :         }
     219             : 
     220             :         /*
     221             :          * We need timeout because we generally don't get notified via latch
     222             :          * about the worker attach.  But we don't expect to have to wait long.
     223             :          */
     224        2744 :         rc = WaitLatch(MyLatch,
     225             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     226             :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     227             : 
     228        2744 :         if (rc & WL_LATCH_SET)
     229             :         {
     230         820 :             ResetLatch(MyLatch);
     231         820 :             CHECK_FOR_INTERRUPTS();
     232             :         }
     233             :     }
     234             : }
     235             : 
     236             : /*
     237             :  * Walks the workers array and searches for one that matches given
     238             :  * subscription id and relid.
     239             :  *
     240             :  * We are only interested in the leader apply worker or table sync worker.
     241             :  */
     242             : LogicalRepWorker *
     243        4198 : logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
     244             : {
     245             :     int         i;
     246        4198 :     LogicalRepWorker *res = NULL;
     247             : 
     248             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     249             : 
     250             :     /* Search for attached worker for a given subscription id. */
     251       12760 :     for (i = 0; i < max_logical_replication_workers; i++)
     252             :     {
     253       11190 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     254             : 
     255             :         /* Skip parallel apply workers. */
     256       11190 :         if (isParallelApplyWorker(w))
     257           0 :             continue;
     258             : 
     259       11190 :         if (w->in_use && w->subid == subid && w->relid == relid &&
     260        2628 :             (!only_running || w->proc))
     261             :         {
     262        2628 :             res = w;
     263        2628 :             break;
     264             :         }
     265             :     }
     266             : 
     267        4198 :     return res;
     268             : }
     269             : 
     270             : /*
     271             :  * Similar to logicalrep_worker_find(), but returns a list of all workers for
     272             :  * the subscription, instead of just one.
     273             :  */
     274             : List *
     275         950 : logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
     276             : {
     277             :     int         i;
     278         950 :     List       *res = NIL;
     279             : 
     280         950 :     if (acquire_lock)
     281         190 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     282             : 
     283             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     284             : 
     285             :     /* Search for attached worker for a given subscription id. */
     286        4938 :     for (i = 0; i < max_logical_replication_workers; i++)
     287             :     {
     288        3988 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     289             : 
     290        3988 :         if (w->in_use && w->subid == subid && (!only_running || w->proc))
     291         622 :             res = lappend(res, w);
     292             :     }
     293             : 
     294         950 :     if (acquire_lock)
     295         190 :         LWLockRelease(LogicalRepWorkerLock);
     296             : 
     297         950 :     return res;
     298             : }
     299             : 
     300             : /*
     301             :  * Start new logical replication background worker, if possible.
     302             :  *
     303             :  * Returns true on success, false on failure.
     304             :  */
     305             : bool
     306         630 : logicalrep_worker_launch(LogicalRepWorkerType wtype,
     307             :                          Oid dbid, Oid subid, const char *subname, Oid userid,
     308             :                          Oid relid, dsm_handle subworker_dsm)
     309             : {
     310             :     BackgroundWorker bgw;
     311             :     BackgroundWorkerHandle *bgw_handle;
     312             :     uint16      generation;
     313             :     int         i;
     314         630 :     int         slot = 0;
     315         630 :     LogicalRepWorker *worker = NULL;
     316             :     int         nsyncworkers;
     317             :     int         nparallelapplyworkers;
     318             :     TimestampTz now;
     319         630 :     bool        is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
     320         630 :     bool        is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
     321             : 
     322             :     /*----------
     323             :      * Sanity checks:
     324             :      * - must be valid worker type
     325             :      * - tablesync workers are only ones to have relid
     326             :      * - parallel apply worker is the only kind of subworker
     327             :      */
     328             :     Assert(wtype != WORKERTYPE_UNKNOWN);
     329             :     Assert(is_tablesync_worker == OidIsValid(relid));
     330             :     Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
     331             : 
     332         630 :     ereport(DEBUG1,
     333             :             (errmsg_internal("starting logical replication worker for subscription \"%s\"",
     334             :                              subname)));
     335             : 
     336             :     /* Report this after the initial starting message for consistency. */
     337         630 :     if (max_replication_slots == 0)
     338           0 :         ereport(ERROR,
     339             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     340             :                  errmsg("cannot start logical replication workers when \"max_replication_slots\"=0")));
     341             : 
     342             :     /*
     343             :      * We need to do the modification of the shared memory under lock so that
     344             :      * we have consistent view.
     345             :      */
     346         630 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     347             : 
     348         630 : retry:
     349             :     /* Find unused worker slot. */
     350        1130 :     for (i = 0; i < max_logical_replication_workers; i++)
     351             :     {
     352        1130 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     353             : 
     354        1130 :         if (!w->in_use)
     355             :         {
     356         630 :             worker = w;
     357         630 :             slot = i;
     358         630 :             break;
     359             :         }
     360             :     }
     361             : 
     362         630 :     nsyncworkers = logicalrep_sync_worker_count(subid);
     363             : 
     364         630 :     now = GetCurrentTimestamp();
     365             : 
     366             :     /*
     367             :      * If we didn't find a free slot, try to do garbage collection.  The
     368             :      * reason we do this is because if some worker failed to start up and its
     369             :      * parent has crashed while waiting, the in_use state was never cleared.
     370             :      */
     371         630 :     if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
     372             :     {
     373           0 :         bool        did_cleanup = false;
     374             : 
     375           0 :         for (i = 0; i < max_logical_replication_workers; i++)
     376             :         {
     377           0 :             LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     378             : 
     379             :             /*
     380             :              * If the worker was marked in use but didn't manage to attach in
     381             :              * time, clean it up.
     382             :              */
     383           0 :             if (w->in_use && !w->proc &&
     384           0 :                 TimestampDifferenceExceeds(w->launch_time, now,
     385             :                                            wal_receiver_timeout))
     386             :             {
     387           0 :                 elog(WARNING,
     388             :                      "logical replication worker for subscription %u took too long to start; canceled",
     389             :                      w->subid);
     390             : 
     391           0 :                 logicalrep_worker_cleanup(w);
     392           0 :                 did_cleanup = true;
     393             :             }
     394             :         }
     395             : 
     396           0 :         if (did_cleanup)
     397           0 :             goto retry;
     398             :     }
     399             : 
     400             :     /*
     401             :      * We don't allow to invoke more sync workers once we have reached the
     402             :      * sync worker limit per subscription. So, just return silently as we
     403             :      * might get here because of an otherwise harmless race condition.
     404             :      */
     405         630 :     if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
     406             :     {
     407           0 :         LWLockRelease(LogicalRepWorkerLock);
     408           0 :         return false;
     409             :     }
     410             : 
     411         630 :     nparallelapplyworkers = logicalrep_pa_worker_count(subid);
     412             : 
     413             :     /*
     414             :      * Return false if the number of parallel apply workers reached the limit
     415             :      * per subscription.
     416             :      */
     417         630 :     if (is_parallel_apply_worker &&
     418          20 :         nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
     419             :     {
     420           0 :         LWLockRelease(LogicalRepWorkerLock);
     421           0 :         return false;
     422             :     }
     423             : 
     424             :     /*
     425             :      * However if there are no more free worker slots, inform user about it
     426             :      * before exiting.
     427             :      */
     428         630 :     if (worker == NULL)
     429             :     {
     430           0 :         LWLockRelease(LogicalRepWorkerLock);
     431           0 :         ereport(WARNING,
     432             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     433             :                  errmsg("out of logical replication worker slots"),
     434             :                  errhint("You might need to increase \"%s\".", "max_logical_replication_workers")));
     435           0 :         return false;
     436             :     }
     437             : 
     438             :     /* Prepare the worker slot. */
     439         630 :     worker->type = wtype;
     440         630 :     worker->launch_time = now;
     441         630 :     worker->in_use = true;
     442         630 :     worker->generation++;
     443         630 :     worker->proc = NULL;
     444         630 :     worker->dbid = dbid;
     445         630 :     worker->userid = userid;
     446         630 :     worker->subid = subid;
     447         630 :     worker->relid = relid;
     448         630 :     worker->relstate = SUBREL_STATE_UNKNOWN;
     449         630 :     worker->relstate_lsn = InvalidXLogRecPtr;
     450         630 :     worker->stream_fileset = NULL;
     451         630 :     worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
     452         630 :     worker->parallel_apply = is_parallel_apply_worker;
     453         630 :     worker->last_lsn = InvalidXLogRecPtr;
     454         630 :     TIMESTAMP_NOBEGIN(worker->last_send_time);
     455         630 :     TIMESTAMP_NOBEGIN(worker->last_recv_time);
     456         630 :     worker->reply_lsn = InvalidXLogRecPtr;
     457         630 :     TIMESTAMP_NOBEGIN(worker->reply_time);
     458             : 
     459             :     /* Before releasing lock, remember generation for future identification. */
     460         630 :     generation = worker->generation;
     461             : 
     462         630 :     LWLockRelease(LogicalRepWorkerLock);
     463             : 
     464             :     /* Register the new dynamic worker. */
     465         630 :     memset(&bgw, 0, sizeof(bgw));
     466         630 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
     467             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     468         630 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
     469         630 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
     470             : 
     471         630 :     switch (worker->type)
     472             :     {
     473         268 :         case WORKERTYPE_APPLY:
     474         268 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
     475         268 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     476             :                      "logical replication apply worker for subscription %u",
     477             :                      subid);
     478         268 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
     479         268 :             break;
     480             : 
     481          20 :         case WORKERTYPE_PARALLEL_APPLY:
     482          20 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
     483          20 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     484             :                      "logical replication parallel apply worker for subscription %u",
     485             :                      subid);
     486          20 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
     487             : 
     488          20 :             memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
     489          20 :             break;
     490             : 
     491         342 :         case WORKERTYPE_TABLESYNC:
     492         342 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
     493         342 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     494             :                      "logical replication tablesync worker for subscription %u sync %u",
     495             :                      subid,
     496             :                      relid);
     497         342 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
     498         342 :             break;
     499             : 
     500           0 :         case WORKERTYPE_UNKNOWN:
     501             :             /* Should never happen. */
     502           0 :             elog(ERROR, "unknown worker type");
     503             :     }
     504             : 
     505         630 :     bgw.bgw_restart_time = BGW_NEVER_RESTART;
     506         630 :     bgw.bgw_notify_pid = MyProcPid;
     507         630 :     bgw.bgw_main_arg = Int32GetDatum(slot);
     508             : 
     509         630 :     if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
     510             :     {
     511             :         /* Failed to start worker, so clean up the worker slot. */
     512           0 :         LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     513             :         Assert(generation == worker->generation);
     514           0 :         logicalrep_worker_cleanup(worker);
     515           0 :         LWLockRelease(LogicalRepWorkerLock);
     516             : 
     517           0 :         ereport(WARNING,
     518             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     519             :                  errmsg("out of background worker slots"),
     520             :                  errhint("You might need to increase \"%s\".", "max_worker_processes")));
     521           0 :         return false;
     522             :     }
     523             : 
     524             :     /* Now wait until it attaches. */
     525         630 :     return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
     526             : }
     527             : 
     528             : /*
     529             :  * Internal function to stop the worker and wait until it detaches from the
     530             :  * slot.
     531             :  */
     532             : static void
     533         134 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
     534             : {
     535             :     uint16      generation;
     536             : 
     537             :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
     538             : 
     539             :     /*
     540             :      * Remember which generation was our worker so we can check if what we see
     541             :      * is still the same one.
     542             :      */
     543         134 :     generation = worker->generation;
     544             : 
     545             :     /*
     546             :      * If we found a worker but it does not have proc set then it is still
     547             :      * starting up; wait for it to finish starting and then kill it.
     548             :      */
     549         164 :     while (worker->in_use && !worker->proc)
     550             :     {
     551             :         int         rc;
     552             : 
     553          40 :         LWLockRelease(LogicalRepWorkerLock);
     554             : 
     555             :         /* Wait a bit --- we don't expect to have to wait long. */
     556          40 :         rc = WaitLatch(MyLatch,
     557             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     558             :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     559             : 
     560          40 :         if (rc & WL_LATCH_SET)
     561             :         {
     562           0 :             ResetLatch(MyLatch);
     563           0 :             CHECK_FOR_INTERRUPTS();
     564             :         }
     565             : 
     566             :         /* Recheck worker status. */
     567          40 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     568             : 
     569             :         /*
     570             :          * Check whether the worker slot is no longer used, which would mean
     571             :          * that the worker has exited, or whether the worker generation is
     572             :          * different, meaning that a different worker has taken the slot.
     573             :          */
     574          40 :         if (!worker->in_use || worker->generation != generation)
     575           0 :             return;
     576             : 
     577             :         /* Worker has assigned proc, so it has started. */
     578          40 :         if (worker->proc)
     579          10 :             break;
     580             :     }
     581             : 
     582             :     /* Now terminate the worker ... */
     583         134 :     kill(worker->proc->pid, signo);
     584             : 
     585             :     /* ... and wait for it to die. */
     586             :     for (;;)
     587         174 :     {
     588             :         int         rc;
     589             : 
     590             :         /* is it gone? */
     591         308 :         if (!worker->proc || worker->generation != generation)
     592             :             break;
     593             : 
     594         174 :         LWLockRelease(LogicalRepWorkerLock);
     595             : 
     596             :         /* Wait a bit --- we don't expect to have to wait long. */
     597         174 :         rc = WaitLatch(MyLatch,
     598             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     599             :                        10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
     600             : 
     601         174 :         if (rc & WL_LATCH_SET)
     602             :         {
     603          42 :             ResetLatch(MyLatch);
     604          42 :             CHECK_FOR_INTERRUPTS();
     605             :         }
     606             : 
     607         174 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     608             :     }
     609             : }
     610             : 
     611             : /*
     612             :  * Stop the logical replication worker for subid/relid, if any.
     613             :  */
     614             : void
     615         154 : logicalrep_worker_stop(Oid subid, Oid relid)
     616             : {
     617             :     LogicalRepWorker *worker;
     618             : 
     619         154 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     620             : 
     621         154 :     worker = logicalrep_worker_find(subid, relid, false);
     622             : 
     623         154 :     if (worker)
     624             :     {
     625             :         Assert(!isParallelApplyWorker(worker));
     626         118 :         logicalrep_worker_stop_internal(worker, SIGTERM);
     627             :     }
     628             : 
     629         154 :     LWLockRelease(LogicalRepWorkerLock);
     630         154 : }
     631             : 
     632             : /*
     633             :  * Stop the given logical replication parallel apply worker.
     634             :  *
     635             :  * Node that the function sends SIGINT instead of SIGTERM to the parallel apply
     636             :  * worker so that the worker exits cleanly.
     637             :  */
     638             : void
     639          10 : logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
     640             : {
     641             :     int         slot_no;
     642             :     uint16      generation;
     643             :     LogicalRepWorker *worker;
     644             : 
     645          10 :     SpinLockAcquire(&winfo->shared->mutex);
     646          10 :     generation = winfo->shared->logicalrep_worker_generation;
     647          10 :     slot_no = winfo->shared->logicalrep_worker_slot_no;
     648          10 :     SpinLockRelease(&winfo->shared->mutex);
     649             : 
     650             :     Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
     651             : 
     652             :     /*
     653             :      * Detach from the error_mq_handle for the parallel apply worker before
     654             :      * stopping it. This prevents the leader apply worker from trying to
     655             :      * receive the message from the error queue that might already be detached
     656             :      * by the parallel apply worker.
     657             :      */
     658          10 :     if (winfo->error_mq_handle)
     659             :     {
     660          10 :         shm_mq_detach(winfo->error_mq_handle);
     661          10 :         winfo->error_mq_handle = NULL;
     662             :     }
     663             : 
     664          10 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     665             : 
     666          10 :     worker = &LogicalRepCtx->workers[slot_no];
     667             :     Assert(isParallelApplyWorker(worker));
     668             : 
     669             :     /*
     670             :      * Only stop the worker if the generation matches and the worker is alive.
     671             :      */
     672          10 :     if (worker->generation == generation && worker->proc)
     673          10 :         logicalrep_worker_stop_internal(worker, SIGINT);
     674             : 
     675          10 :     LWLockRelease(LogicalRepWorkerLock);
     676          10 : }
     677             : 
     678             : /*
     679             :  * Wake up (using latch) any logical replication worker for specified sub/rel.
     680             :  */
     681             : void
     682         382 : logicalrep_worker_wakeup(Oid subid, Oid relid)
     683             : {
     684             :     LogicalRepWorker *worker;
     685             : 
     686         382 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     687             : 
     688         382 :     worker = logicalrep_worker_find(subid, relid, true);
     689             : 
     690         382 :     if (worker)
     691         382 :         logicalrep_worker_wakeup_ptr(worker);
     692             : 
     693         382 :     LWLockRelease(LogicalRepWorkerLock);
     694         382 : }
     695             : 
     696             : /*
     697             :  * Wake up (using latch) the specified logical replication worker.
     698             :  *
     699             :  * Caller must hold lock, else worker->proc could change under us.
     700             :  */
     701             : void
     702        1136 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
     703             : {
     704             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     705             : 
     706        1136 :     SetLatch(&worker->proc->procLatch);
     707        1136 : }
     708             : 
     709             : /*
     710             :  * Attach to a slot.
     711             :  */
     712             : void
     713         764 : logicalrep_worker_attach(int slot)
     714             : {
     715             :     /* Block concurrent access. */
     716         764 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     717             : 
     718             :     Assert(slot >= 0 && slot < max_logical_replication_workers);
     719         764 :     MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
     720             : 
     721         764 :     if (!MyLogicalRepWorker->in_use)
     722             :     {
     723           0 :         LWLockRelease(LogicalRepWorkerLock);
     724           0 :         ereport(ERROR,
     725             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     726             :                  errmsg("logical replication worker slot %d is empty, cannot attach",
     727             :                         slot)));
     728             :     }
     729             : 
     730         764 :     if (MyLogicalRepWorker->proc)
     731             :     {
     732           0 :         LWLockRelease(LogicalRepWorkerLock);
     733           0 :         ereport(ERROR,
     734             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     735             :                  errmsg("logical replication worker slot %d is already used by "
     736             :                         "another worker, cannot attach", slot)));
     737             :     }
     738             : 
     739         764 :     MyLogicalRepWorker->proc = MyProc;
     740         764 :     before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
     741             : 
     742         764 :     LWLockRelease(LogicalRepWorkerLock);
     743         764 : }
     744             : 
     745             : /*
     746             :  * Stop the parallel apply workers if any, and detach the leader apply worker
     747             :  * (cleans up the worker info).
     748             :  */
     749             : static void
     750         764 : logicalrep_worker_detach(void)
     751             : {
     752             :     /* Stop the parallel apply workers. */
     753         764 :     if (am_leader_apply_worker())
     754             :     {
     755             :         List       *workers;
     756             :         ListCell   *lc;
     757             : 
     758             :         /*
     759             :          * Detach from the error_mq_handle for all parallel apply workers
     760             :          * before terminating them. This prevents the leader apply worker from
     761             :          * receiving the worker termination message and sending it to logs
     762             :          * when the same is already done by the parallel worker.
     763             :          */
     764         398 :         pa_detach_all_error_mq();
     765             : 
     766         398 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     767             : 
     768         398 :         workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
     769         802 :         foreach(lc, workers)
     770             :         {
     771         404 :             LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
     772             : 
     773         404 :             if (isParallelApplyWorker(w))
     774           6 :                 logicalrep_worker_stop_internal(w, SIGTERM);
     775             :         }
     776             : 
     777         398 :         LWLockRelease(LogicalRepWorkerLock);
     778             :     }
     779             : 
     780             :     /* Block concurrent access. */
     781         764 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     782             : 
     783         764 :     logicalrep_worker_cleanup(MyLogicalRepWorker);
     784             : 
     785         764 :     LWLockRelease(LogicalRepWorkerLock);
     786         764 : }
     787             : 
     788             : /*
     789             :  * Clean up worker info.
     790             :  */
     791             : static void
     792         764 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
     793             : {
     794             :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
     795             : 
     796         764 :     worker->type = WORKERTYPE_UNKNOWN;
     797         764 :     worker->in_use = false;
     798         764 :     worker->proc = NULL;
     799         764 :     worker->dbid = InvalidOid;
     800         764 :     worker->userid = InvalidOid;
     801         764 :     worker->subid = InvalidOid;
     802         764 :     worker->relid = InvalidOid;
     803         764 :     worker->leader_pid = InvalidPid;
     804         764 :     worker->parallel_apply = false;
     805         764 : }
     806             : 
     807             : /*
     808             :  * Cleanup function for logical replication launcher.
     809             :  *
     810             :  * Called on logical replication launcher exit.
     811             :  */
     812             : static void
     813         740 : logicalrep_launcher_onexit(int code, Datum arg)
     814             : {
     815         740 :     LogicalRepCtx->launcher_pid = 0;
     816         740 : }
     817             : 
     818             : /*
     819             :  * Cleanup function.
     820             :  *
     821             :  * Called on logical replication worker exit.
     822             :  */
     823             : static void
     824         764 : logicalrep_worker_onexit(int code, Datum arg)
     825             : {
     826             :     /* Disconnect gracefully from the remote side. */
     827         764 :     if (LogRepWorkerWalRcvConn)
     828         686 :         walrcv_disconnect(LogRepWorkerWalRcvConn);
     829             : 
     830         764 :     logicalrep_worker_detach();
     831             : 
     832             :     /* Cleanup fileset used for streaming transactions. */
     833         764 :     if (MyLogicalRepWorker->stream_fileset != NULL)
     834          28 :         FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
     835             : 
     836             :     /*
     837             :      * Session level locks may be acquired outside of a transaction in
     838             :      * parallel apply mode and will not be released when the worker
     839             :      * terminates, so manually release all locks before the worker exits.
     840             :      *
     841             :      * The locks will be acquired once the worker is initialized.
     842             :      */
     843         764 :     if (!InitializingApplyWorker)
     844         754 :         LockReleaseAll(DEFAULT_LOCKMETHOD, true);
     845             : 
     846         764 :     ApplyLauncherWakeup();
     847         764 : }
     848             : 
     849             : /*
     850             :  * Count the number of registered (not necessarily running) sync workers
     851             :  * for a subscription.
     852             :  */
     853             : int
     854        1852 : logicalrep_sync_worker_count(Oid subid)
     855             : {
     856             :     int         i;
     857        1852 :     int         res = 0;
     858             : 
     859             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     860             : 
     861             :     /* Search for attached worker for a given subscription id. */
     862        9596 :     for (i = 0; i < max_logical_replication_workers; i++)
     863             :     {
     864        7744 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     865             : 
     866        7744 :         if (isTablesyncWorker(w) && w->subid == subid)
     867        1968 :             res++;
     868             :     }
     869             : 
     870        1852 :     return res;
     871             : }
     872             : 
     873             : /*
     874             :  * Count the number of registered (but not necessarily running) parallel apply
     875             :  * workers for a subscription.
     876             :  */
     877             : static int
     878         630 : logicalrep_pa_worker_count(Oid subid)
     879             : {
     880             :     int         i;
     881         630 :     int         res = 0;
     882             : 
     883             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     884             : 
     885             :     /*
     886             :      * Scan all attached parallel apply workers, only counting those which
     887             :      * have the given subscription id.
     888             :      */
     889        3338 :     for (i = 0; i < max_logical_replication_workers; i++)
     890             :     {
     891        2708 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     892             : 
     893        2708 :         if (isParallelApplyWorker(w) && w->subid == subid)
     894           4 :             res++;
     895             :     }
     896             : 
     897         630 :     return res;
     898             : }
     899             : 
     900             : /*
     901             :  * ApplyLauncherShmemSize
     902             :  *      Compute space needed for replication launcher shared memory
     903             :  */
     904             : Size
     905        7338 : ApplyLauncherShmemSize(void)
     906             : {
     907             :     Size        size;
     908             : 
     909             :     /*
     910             :      * Need the fixed struct and the array of LogicalRepWorker.
     911             :      */
     912        7338 :     size = sizeof(LogicalRepCtxStruct);
     913        7338 :     size = MAXALIGN(size);
     914        7338 :     size = add_size(size, mul_size(max_logical_replication_workers,
     915             :                                    sizeof(LogicalRepWorker)));
     916        7338 :     return size;
     917             : }
     918             : 
     919             : /*
     920             :  * ApplyLauncherRegister
     921             :  *      Register a background worker running the logical replication launcher.
     922             :  */
     923             : void
     924        1528 : ApplyLauncherRegister(void)
     925             : {
     926             :     BackgroundWorker bgw;
     927             : 
     928             :     /*
     929             :      * The logical replication launcher is disabled during binary upgrades, to
     930             :      * prevent logical replication workers from running on the source cluster.
     931             :      * That could cause replication origins to move forward after having been
     932             :      * copied to the target cluster, potentially creating conflicts with the
     933             :      * copied data files.
     934             :      */
     935        1528 :     if (max_logical_replication_workers == 0 || IsBinaryUpgrade)
     936          54 :         return;
     937             : 
     938        1474 :     memset(&bgw, 0, sizeof(bgw));
     939        1474 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
     940             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     941        1474 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
     942        1474 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
     943        1474 :     snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
     944        1474 :     snprintf(bgw.bgw_name, BGW_MAXLEN,
     945             :              "logical replication launcher");
     946        1474 :     snprintf(bgw.bgw_type, BGW_MAXLEN,
     947             :              "logical replication launcher");
     948        1474 :     bgw.bgw_restart_time = 5;
     949        1474 :     bgw.bgw_notify_pid = 0;
     950        1474 :     bgw.bgw_main_arg = (Datum) 0;
     951             : 
     952        1474 :     RegisterBackgroundWorker(&bgw);
     953             : }
     954             : 
     955             : /*
     956             :  * ApplyLauncherShmemInit
     957             :  *      Allocate and initialize replication launcher shared memory
     958             :  */
     959             : void
     960        1902 : ApplyLauncherShmemInit(void)
     961             : {
     962             :     bool        found;
     963             : 
     964        1902 :     LogicalRepCtx = (LogicalRepCtxStruct *)
     965        1902 :         ShmemInitStruct("Logical Replication Launcher Data",
     966             :                         ApplyLauncherShmemSize(),
     967             :                         &found);
     968             : 
     969        1902 :     if (!found)
     970             :     {
     971             :         int         slot;
     972             : 
     973        1902 :         memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
     974             : 
     975        1902 :         LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
     976        1902 :         LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
     977             : 
     978             :         /* Initialize memory and spin locks for each worker slot. */
     979        9452 :         for (slot = 0; slot < max_logical_replication_workers; slot++)
     980             :         {
     981        7550 :             LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
     982             : 
     983        7550 :             memset(worker, 0, sizeof(LogicalRepWorker));
     984        7550 :             SpinLockInit(&worker->relmutex);
     985             :         }
     986             :     }
     987        1902 : }
     988             : 
     989             : /*
     990             :  * Initialize or attach to the dynamic shared hash table that stores the
     991             :  * last-start times, if not already done.
     992             :  * This must be called before accessing the table.
     993             :  */
     994             : static void
     995         852 : logicalrep_launcher_attach_dshmem(void)
     996             : {
     997             :     MemoryContext oldcontext;
     998             : 
     999             :     /* Quick exit if we already did this. */
    1000         852 :     if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
    1001         754 :         last_start_times != NULL)
    1002         542 :         return;
    1003             : 
    1004             :     /* Otherwise, use a lock to ensure only one process creates the table. */
    1005         310 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
    1006             : 
    1007             :     /* Be sure any local memory allocated by DSA routines is persistent. */
    1008         310 :     oldcontext = MemoryContextSwitchTo(TopMemoryContext);
    1009             : 
    1010         310 :     if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
    1011             :     {
    1012             :         /* Initialize dynamic shared hash table for last-start times. */
    1013          98 :         last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
    1014          98 :         dsa_pin(last_start_times_dsa);
    1015          98 :         dsa_pin_mapping(last_start_times_dsa);
    1016          98 :         last_start_times = dshash_create(last_start_times_dsa, &dsh_params, NULL);
    1017             : 
    1018             :         /* Store handles in shared memory for other backends to use. */
    1019          98 :         LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
    1020          98 :         LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
    1021             :     }
    1022         212 :     else if (!last_start_times)
    1023             :     {
    1024             :         /* Attach to existing dynamic shared hash table. */
    1025         212 :         last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
    1026         212 :         dsa_pin_mapping(last_start_times_dsa);
    1027         212 :         last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
    1028         212 :                                          LogicalRepCtx->last_start_dsh, 0);
    1029             :     }
    1030             : 
    1031         310 :     MemoryContextSwitchTo(oldcontext);
    1032         310 :     LWLockRelease(LogicalRepWorkerLock);
    1033             : }
    1034             : 
    1035             : /*
    1036             :  * Set the last-start time for the subscription.
    1037             :  */
    1038             : static void
    1039         268 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
    1040             : {
    1041             :     LauncherLastStartTimesEntry *entry;
    1042             :     bool        found;
    1043             : 
    1044         268 :     logicalrep_launcher_attach_dshmem();
    1045             : 
    1046         268 :     entry = dshash_find_or_insert(last_start_times, &subid, &found);
    1047         268 :     entry->last_start_time = start_time;
    1048         268 :     dshash_release_lock(last_start_times, entry);
    1049         268 : }
    1050             : 
    1051             : /*
    1052             :  * Return the last-start time for the subscription, or 0 if there isn't one.
    1053             :  */
    1054             : static TimestampTz
    1055         312 : ApplyLauncherGetWorkerStartTime(Oid subid)
    1056             : {
    1057             :     LauncherLastStartTimesEntry *entry;
    1058             :     TimestampTz ret;
    1059             : 
    1060         312 :     logicalrep_launcher_attach_dshmem();
    1061             : 
    1062         312 :     entry = dshash_find(last_start_times, &subid, false);
    1063         312 :     if (entry == NULL)
    1064         230 :         return 0;
    1065             : 
    1066          82 :     ret = entry->last_start_time;
    1067          82 :     dshash_release_lock(last_start_times, entry);
    1068             : 
    1069          82 :     return ret;
    1070             : }
    1071             : 
    1072             : /*
    1073             :  * Remove the last-start-time entry for the subscription, if one exists.
    1074             :  *
    1075             :  * This has two use-cases: to remove the entry related to a subscription
    1076             :  * that's been deleted or disabled (just to avoid leaking shared memory),
    1077             :  * and to allow immediate restart of an apply worker that has exited
    1078             :  * due to subscription parameter changes.
    1079             :  */
    1080             : void
    1081         272 : ApplyLauncherForgetWorkerStartTime(Oid subid)
    1082             : {
    1083         272 :     logicalrep_launcher_attach_dshmem();
    1084             : 
    1085         272 :     (void) dshash_delete_key(last_start_times, &subid);
    1086         272 : }
    1087             : 
    1088             : /*
    1089             :  * Wakeup the launcher on commit if requested.
    1090             :  */
    1091             : void
    1092      748464 : AtEOXact_ApplyLauncher(bool isCommit)
    1093             : {
    1094      748464 :     if (isCommit)
    1095             :     {
    1096      701178 :         if (on_commit_launcher_wakeup)
    1097         244 :             ApplyLauncherWakeup();
    1098             :     }
    1099             : 
    1100      748464 :     on_commit_launcher_wakeup = false;
    1101      748464 : }
    1102             : 
    1103             : /*
    1104             :  * Request wakeup of the launcher on commit of the transaction.
    1105             :  *
    1106             :  * This is used to send launcher signal to stop sleeping and process the
    1107             :  * subscriptions when current transaction commits. Should be used when new
    1108             :  * tuple was added to the pg_subscription catalog.
    1109             : */
    1110             : void
    1111         244 : ApplyLauncherWakeupAtCommit(void)
    1112             : {
    1113         244 :     if (!on_commit_launcher_wakeup)
    1114         244 :         on_commit_launcher_wakeup = true;
    1115         244 : }
    1116             : 
    1117             : static void
    1118        1008 : ApplyLauncherWakeup(void)
    1119             : {
    1120        1008 :     if (LogicalRepCtx->launcher_pid != 0)
    1121         950 :         kill(LogicalRepCtx->launcher_pid, SIGUSR1);
    1122        1008 : }
    1123             : 
    1124             : /*
    1125             :  * Main loop for the apply launcher process.
    1126             :  */
    1127             : void
    1128         740 : ApplyLauncherMain(Datum main_arg)
    1129             : {
    1130         740 :     ereport(DEBUG1,
    1131             :             (errmsg_internal("logical replication launcher started")));
    1132             : 
    1133         740 :     before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
    1134             : 
    1135             :     Assert(LogicalRepCtx->launcher_pid == 0);
    1136         740 :     LogicalRepCtx->launcher_pid = MyProcPid;
    1137             : 
    1138             :     /* Establish signal handlers. */
    1139         740 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
    1140         740 :     pqsignal(SIGTERM, die);
    1141         740 :     BackgroundWorkerUnblockSignals();
    1142             : 
    1143             :     /*
    1144             :      * Establish connection to nailed catalogs (we only ever access
    1145             :      * pg_subscription).
    1146             :      */
    1147         740 :     BackgroundWorkerInitializeConnection(NULL, NULL, 0);
    1148             : 
    1149             :     /* Enter main loop */
    1150             :     for (;;)
    1151        3714 :     {
    1152             :         int         rc;
    1153             :         List       *sublist;
    1154             :         ListCell   *lc;
    1155             :         MemoryContext subctx;
    1156             :         MemoryContext oldctx;
    1157        4454 :         long        wait_time = DEFAULT_NAPTIME_PER_CYCLE;
    1158             : 
    1159        4454 :         CHECK_FOR_INTERRUPTS();
    1160             : 
    1161             :         /* Use temporary context to avoid leaking memory across cycles. */
    1162        4422 :         subctx = AllocSetContextCreate(TopMemoryContext,
    1163             :                                        "Logical Replication Launcher sublist",
    1164             :                                        ALLOCSET_DEFAULT_SIZES);
    1165        4422 :         oldctx = MemoryContextSwitchTo(subctx);
    1166             : 
    1167             :         /* Start any missing workers for enabled subscriptions. */
    1168        4422 :         sublist = get_subscription_list();
    1169        5212 :         foreach(lc, sublist)
    1170             :         {
    1171         792 :             Subscription *sub = (Subscription *) lfirst(lc);
    1172             :             LogicalRepWorker *w;
    1173             :             TimestampTz last_start;
    1174             :             TimestampTz now;
    1175             :             long        elapsed;
    1176             : 
    1177         792 :             if (!sub->enabled)
    1178          66 :                 continue;
    1179             : 
    1180         726 :             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1181         726 :             w = logicalrep_worker_find(sub->oid, InvalidOid, false);
    1182         726 :             LWLockRelease(LogicalRepWorkerLock);
    1183             : 
    1184         726 :             if (w != NULL)
    1185         414 :                 continue;       /* worker is running already */
    1186             : 
    1187             :             /*
    1188             :              * If the worker is eligible to start now, launch it.  Otherwise,
    1189             :              * adjust wait_time so that we'll wake up as soon as it can be
    1190             :              * started.
    1191             :              *
    1192             :              * Each subscription's apply worker can only be restarted once per
    1193             :              * wal_retrieve_retry_interval, so that errors do not cause us to
    1194             :              * repeatedly restart the worker as fast as possible.  In cases
    1195             :              * where a restart is expected (e.g., subscription parameter
    1196             :              * changes), another process should remove the last-start entry
    1197             :              * for the subscription so that the worker can be restarted
    1198             :              * without waiting for wal_retrieve_retry_interval to elapse.
    1199             :              */
    1200         312 :             last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
    1201         312 :             now = GetCurrentTimestamp();
    1202         312 :             if (last_start == 0 ||
    1203          82 :                 (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
    1204             :             {
    1205         268 :                 ApplyLauncherSetWorkerStartTime(sub->oid, now);
    1206         268 :                 logicalrep_worker_launch(WORKERTYPE_APPLY,
    1207         268 :                                          sub->dbid, sub->oid, sub->name,
    1208             :                                          sub->owner, InvalidOid,
    1209             :                                          DSM_HANDLE_INVALID);
    1210             :             }
    1211             :             else
    1212             :             {
    1213          44 :                 wait_time = Min(wait_time,
    1214             :                                 wal_retrieve_retry_interval - elapsed);
    1215             :             }
    1216             :         }
    1217             : 
    1218             :         /* Switch back to original memory context. */
    1219        4420 :         MemoryContextSwitchTo(oldctx);
    1220             :         /* Clean the temporary memory. */
    1221        4420 :         MemoryContextDelete(subctx);
    1222             : 
    1223             :         /* Wait for more work. */
    1224        4420 :         rc = WaitLatch(MyLatch,
    1225             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1226             :                        wait_time,
    1227             :                        WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
    1228             : 
    1229        4414 :         if (rc & WL_LATCH_SET)
    1230             :         {
    1231        4400 :             ResetLatch(MyLatch);
    1232        4400 :             CHECK_FOR_INTERRUPTS();
    1233             :         }
    1234             : 
    1235        3714 :         if (ConfigReloadPending)
    1236             :         {
    1237          58 :             ConfigReloadPending = false;
    1238          58 :             ProcessConfigFile(PGC_SIGHUP);
    1239             :         }
    1240             :     }
    1241             : 
    1242             :     /* Not reachable */
    1243             : }
    1244             : 
    1245             : /*
    1246             :  * Is current process the logical replication launcher?
    1247             :  */
    1248             : bool
    1249         776 : IsLogicalLauncher(void)
    1250             : {
    1251         776 :     return LogicalRepCtx->launcher_pid == MyProcPid;
    1252             : }
    1253             : 
    1254             : /*
    1255             :  * Return the pid of the leader apply worker if the given pid is the pid of a
    1256             :  * parallel apply worker, otherwise, return InvalidPid.
    1257             :  */
    1258             : pid_t
    1259        1504 : GetLeaderApplyWorkerPid(pid_t pid)
    1260             : {
    1261        1504 :     int         leader_pid = InvalidPid;
    1262             :     int         i;
    1263             : 
    1264        1504 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1265             : 
    1266        7520 :     for (i = 0; i < max_logical_replication_workers; i++)
    1267             :     {
    1268        6016 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
    1269             : 
    1270        6016 :         if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
    1271             :         {
    1272           0 :             leader_pid = w->leader_pid;
    1273           0 :             break;
    1274             :         }
    1275             :     }
    1276             : 
    1277        1504 :     LWLockRelease(LogicalRepWorkerLock);
    1278             : 
    1279        1504 :     return leader_pid;
    1280             : }
    1281             : 
    1282             : /*
    1283             :  * Returns state of the subscriptions.
    1284             :  */
    1285             : Datum
    1286           2 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
    1287             : {
    1288             : #define PG_STAT_GET_SUBSCRIPTION_COLS   10
    1289           2 :     Oid         subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
    1290             :     int         i;
    1291           2 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1292             : 
    1293           2 :     InitMaterializedSRF(fcinfo, 0);
    1294             : 
    1295             :     /* Make sure we get consistent view of the workers. */
    1296           2 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1297             : 
    1298          10 :     for (i = 0; i < max_logical_replication_workers; i++)
    1299             :     {
    1300             :         /* for each row */
    1301           8 :         Datum       values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
    1302           8 :         bool        nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
    1303             :         int         worker_pid;
    1304             :         LogicalRepWorker worker;
    1305             : 
    1306           8 :         memcpy(&worker, &LogicalRepCtx->workers[i],
    1307             :                sizeof(LogicalRepWorker));
    1308           8 :         if (!worker.proc || !IsBackendPid(worker.proc->pid))
    1309           4 :             continue;
    1310             : 
    1311           4 :         if (OidIsValid(subid) && worker.subid != subid)
    1312           0 :             continue;
    1313             : 
    1314           4 :         worker_pid = worker.proc->pid;
    1315             : 
    1316           4 :         values[0] = ObjectIdGetDatum(worker.subid);
    1317           4 :         if (isTablesyncWorker(&worker))
    1318           0 :             values[1] = ObjectIdGetDatum(worker.relid);
    1319             :         else
    1320           4 :             nulls[1] = true;
    1321           4 :         values[2] = Int32GetDatum(worker_pid);
    1322             : 
    1323           4 :         if (isParallelApplyWorker(&worker))
    1324           0 :             values[3] = Int32GetDatum(worker.leader_pid);
    1325             :         else
    1326           4 :             nulls[3] = true;
    1327             : 
    1328           4 :         if (XLogRecPtrIsInvalid(worker.last_lsn))
    1329           2 :             nulls[4] = true;
    1330             :         else
    1331           2 :             values[4] = LSNGetDatum(worker.last_lsn);
    1332           4 :         if (worker.last_send_time == 0)
    1333           0 :             nulls[5] = true;
    1334             :         else
    1335           4 :             values[5] = TimestampTzGetDatum(worker.last_send_time);
    1336           4 :         if (worker.last_recv_time == 0)
    1337           0 :             nulls[6] = true;
    1338             :         else
    1339           4 :             values[6] = TimestampTzGetDatum(worker.last_recv_time);
    1340           4 :         if (XLogRecPtrIsInvalid(worker.reply_lsn))
    1341           2 :             nulls[7] = true;
    1342             :         else
    1343           2 :             values[7] = LSNGetDatum(worker.reply_lsn);
    1344           4 :         if (worker.reply_time == 0)
    1345           0 :             nulls[8] = true;
    1346             :         else
    1347           4 :             values[8] = TimestampTzGetDatum(worker.reply_time);
    1348             : 
    1349           4 :         switch (worker.type)
    1350             :         {
    1351           4 :             case WORKERTYPE_APPLY:
    1352           4 :                 values[9] = CStringGetTextDatum("apply");
    1353           4 :                 break;
    1354           0 :             case WORKERTYPE_PARALLEL_APPLY:
    1355           0 :                 values[9] = CStringGetTextDatum("parallel apply");
    1356           0 :                 break;
    1357           0 :             case WORKERTYPE_TABLESYNC:
    1358           0 :                 values[9] = CStringGetTextDatum("table synchronization");
    1359           0 :                 break;
    1360           0 :             case WORKERTYPE_UNKNOWN:
    1361             :                 /* Should never happen. */
    1362           0 :                 elog(ERROR, "unknown worker type");
    1363             :         }
    1364             : 
    1365           4 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
    1366             :                              values, nulls);
    1367             : 
    1368             :         /*
    1369             :          * If only a single subscription was requested, and we found it,
    1370             :          * break.
    1371             :          */
    1372           4 :         if (OidIsValid(subid))
    1373           0 :             break;
    1374             :     }
    1375             : 
    1376           2 :     LWLockRelease(LogicalRepWorkerLock);
    1377             : 
    1378           2 :     return (Datum) 0;
    1379             : }

Generated by: LCOV version 1.14