LCOV - code coverage report
Current view: top level - src/backend/replication/logical - launcher.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18beta1 Lines: 416 472 88.1 %
Date: 2025-06-28 01:17:10 Functions: 31 31 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * launcher.c
       3             :  *     PostgreSQL logical replication worker launcher process
       4             :  *
       5             :  * Copyright (c) 2016-2025, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/launcher.c
       9             :  *
      10             :  * NOTES
      11             :  *    This module contains the logical replication worker launcher which
      12             :  *    uses the background worker infrastructure to start the logical
      13             :  *    replication workers for every enabled subscription.
      14             :  *
      15             :  *-------------------------------------------------------------------------
      16             :  */
      17             : 
      18             : #include "postgres.h"
      19             : 
      20             : #include "access/heapam.h"
      21             : #include "access/htup.h"
      22             : #include "access/htup_details.h"
      23             : #include "access/tableam.h"
      24             : #include "access/xact.h"
      25             : #include "catalog/pg_subscription.h"
      26             : #include "catalog/pg_subscription_rel.h"
      27             : #include "funcapi.h"
      28             : #include "lib/dshash.h"
      29             : #include "miscadmin.h"
      30             : #include "pgstat.h"
      31             : #include "postmaster/bgworker.h"
      32             : #include "postmaster/interrupt.h"
      33             : #include "replication/logicallauncher.h"
      34             : #include "replication/origin.h"
      35             : #include "replication/walreceiver.h"
      36             : #include "replication/worker_internal.h"
      37             : #include "storage/ipc.h"
      38             : #include "storage/proc.h"
      39             : #include "storage/procarray.h"
      40             : #include "tcop/tcopprot.h"
      41             : #include "utils/builtins.h"
      42             : #include "utils/memutils.h"
      43             : #include "utils/pg_lsn.h"
      44             : #include "utils/snapmgr.h"
      45             : 
      46             : /* max sleep time between cycles (3min) */
      47             : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
      48             : 
      49             : /* GUC variables */
      50             : int         max_logical_replication_workers = 4;
      51             : int         max_sync_workers_per_subscription = 2;
      52             : int         max_parallel_apply_workers_per_subscription = 2;
      53             : 
      54             : LogicalRepWorker *MyLogicalRepWorker = NULL;
      55             : 
      56             : typedef struct LogicalRepCtxStruct
      57             : {
      58             :     /* Supervisor process. */
      59             :     pid_t       launcher_pid;
      60             : 
      61             :     /* Hash table holding last start times of subscriptions' apply workers. */
      62             :     dsa_handle  last_start_dsa;
      63             :     dshash_table_handle last_start_dsh;
      64             : 
      65             :     /* Background workers. */
      66             :     LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
      67             : } LogicalRepCtxStruct;
      68             : 
      69             : static LogicalRepCtxStruct *LogicalRepCtx;
      70             : 
      71             : /* an entry in the last-start-times shared hash table */
      72             : typedef struct LauncherLastStartTimesEntry
      73             : {
      74             :     Oid         subid;          /* OID of logrep subscription (hash key) */
      75             :     TimestampTz last_start_time;    /* last time its apply worker was started */
      76             : } LauncherLastStartTimesEntry;
      77             : 
      78             : /* parameters for the last-start-times shared hash table */
      79             : static const dshash_parameters dsh_params = {
      80             :     sizeof(Oid),
      81             :     sizeof(LauncherLastStartTimesEntry),
      82             :     dshash_memcmp,
      83             :     dshash_memhash,
      84             :     dshash_memcpy,
      85             :     LWTRANCHE_LAUNCHER_HASH
      86             : };
      87             : 
      88             : static dsa_area *last_start_times_dsa = NULL;
      89             : static dshash_table *last_start_times = NULL;
      90             : 
      91             : static bool on_commit_launcher_wakeup = false;
      92             : 
      93             : 
      94             : static void ApplyLauncherWakeup(void);
      95             : static void logicalrep_launcher_onexit(int code, Datum arg);
      96             : static void logicalrep_worker_onexit(int code, Datum arg);
      97             : static void logicalrep_worker_detach(void);
      98             : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
      99             : static int  logicalrep_pa_worker_count(Oid subid);
     100             : static void logicalrep_launcher_attach_dshmem(void);
     101             : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
     102             : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
     103             : 
     104             : 
     105             : /*
     106             :  * Load the list of subscriptions.
     107             :  *
     108             :  * Only the fields interesting for worker start/stop functions are filled for
     109             :  * each subscription.
     110             :  */
     111             : static List *
     112        5248 : get_subscription_list(void)
     113             : {
     114        5248 :     List       *res = NIL;
     115             :     Relation    rel;
     116             :     TableScanDesc scan;
     117             :     HeapTuple   tup;
     118             :     MemoryContext resultcxt;
     119             : 
     120             :     /* This is the context that we will allocate our output data in */
     121        5248 :     resultcxt = CurrentMemoryContext;
     122             : 
     123             :     /*
     124             :      * Start a transaction so we can access pg_subscription.
     125             :      */
     126        5248 :     StartTransactionCommand();
     127             : 
     128        5248 :     rel = table_open(SubscriptionRelationId, AccessShareLock);
     129        5248 :     scan = table_beginscan_catalog(rel, 0, NULL);
     130             : 
     131        6526 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     132             :     {
     133        1278 :         Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
     134             :         Subscription *sub;
     135             :         MemoryContext oldcxt;
     136             : 
     137             :         /*
     138             :          * Allocate our results in the caller's context, not the
     139             :          * transaction's. We do this inside the loop, and restore the original
     140             :          * context at the end, so that leaky things like heap_getnext() are
     141             :          * not called in a potentially long-lived context.
     142             :          */
     143        1278 :         oldcxt = MemoryContextSwitchTo(resultcxt);
     144             : 
     145        1278 :         sub = (Subscription *) palloc0(sizeof(Subscription));
     146        1278 :         sub->oid = subform->oid;
     147        1278 :         sub->dbid = subform->subdbid;
     148        1278 :         sub->owner = subform->subowner;
     149        1278 :         sub->enabled = subform->subenabled;
     150        1278 :         sub->name = pstrdup(NameStr(subform->subname));
     151             :         /* We don't fill fields we are not interested in. */
     152             : 
     153        1278 :         res = lappend(res, sub);
     154        1278 :         MemoryContextSwitchTo(oldcxt);
     155             :     }
     156             : 
     157        5248 :     table_endscan(scan);
     158        5248 :     table_close(rel, AccessShareLock);
     159             : 
     160        5248 :     CommitTransactionCommand();
     161             : 
     162        5248 :     return res;
     163             : }
     164             : 
     165             : /*
     166             :  * Wait for a background worker to start up and attach to the shmem context.
     167             :  *
     168             :  * This is only needed for cleaning up the shared memory in case the worker
     169             :  * fails to attach.
     170             :  *
     171             :  * Returns whether the attach was successful.
     172             :  */
     173             : static bool
     174         696 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
     175             :                                uint16 generation,
     176             :                                BackgroundWorkerHandle *handle)
     177             : {
     178         696 :     bool        result = false;
     179         696 :     bool        dropped_latch = false;
     180             : 
     181             :     for (;;)
     182        2000 :     {
     183             :         BgwHandleStatus status;
     184             :         pid_t       pid;
     185             :         int         rc;
     186             : 
     187        2696 :         CHECK_FOR_INTERRUPTS();
     188             : 
     189        2696 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     190             : 
     191             :         /* Worker either died or has started. Return false if died. */
     192        2696 :         if (!worker->in_use || worker->proc)
     193             :         {
     194         694 :             result = worker->in_use;
     195         694 :             LWLockRelease(LogicalRepWorkerLock);
     196         694 :             break;
     197             :         }
     198             : 
     199        2002 :         LWLockRelease(LogicalRepWorkerLock);
     200             : 
     201             :         /* Check if worker has died before attaching, and clean up after it. */
     202        2002 :         status = GetBackgroundWorkerPid(handle, &pid);
     203             : 
     204        2002 :         if (status == BGWH_STOPPED)
     205             :         {
     206           0 :             LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     207             :             /* Ensure that this was indeed the worker we waited for. */
     208           0 :             if (generation == worker->generation)
     209           0 :                 logicalrep_worker_cleanup(worker);
     210           0 :             LWLockRelease(LogicalRepWorkerLock);
     211           0 :             break;              /* result is already false */
     212             :         }
     213             : 
     214             :         /*
     215             :          * We need timeout because we generally don't get notified via latch
     216             :          * about the worker attach.  But we don't expect to have to wait long.
     217             :          */
     218        2002 :         rc = WaitLatch(MyLatch,
     219             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     220             :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     221             : 
     222        2002 :         if (rc & WL_LATCH_SET)
     223             :         {
     224         848 :             ResetLatch(MyLatch);
     225         848 :             CHECK_FOR_INTERRUPTS();
     226         846 :             dropped_latch = true;
     227             :         }
     228             :     }
     229             : 
     230             :     /*
     231             :      * If we had to clear a latch event in order to wait, be sure to restore
     232             :      * it before exiting.  Otherwise caller may miss events.
     233             :      */
     234         694 :     if (dropped_latch)
     235         694 :         SetLatch(MyLatch);
     236             : 
     237         694 :     return result;
     238             : }
     239             : 
     240             : /*
     241             :  * Walks the workers array and searches for one that matches given
     242             :  * subscription id and relid.
     243             :  *
     244             :  * We are only interested in the leader apply worker or table sync worker.
     245             :  */
     246             : LogicalRepWorker *
     247        5542 : logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
     248             : {
     249             :     int         i;
     250        5542 :     LogicalRepWorker *res = NULL;
     251             : 
     252             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     253             : 
     254             :     /* Search for attached worker for a given subscription id. */
     255       16654 :     for (i = 0; i < max_logical_replication_workers; i++)
     256             :     {
     257       14612 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     258             : 
     259             :         /* Skip parallel apply workers. */
     260       14612 :         if (isParallelApplyWorker(w))
     261           0 :             continue;
     262             : 
     263       14612 :         if (w->in_use && w->subid == subid && w->relid == relid &&
     264        3500 :             (!only_running || w->proc))
     265             :         {
     266        3500 :             res = w;
     267        3500 :             break;
     268             :         }
     269             :     }
     270             : 
     271        5542 :     return res;
     272             : }
     273             : 
     274             : /*
     275             :  * Similar to logicalrep_worker_find(), but returns a list of all workers for
     276             :  * the subscription, instead of just one.
     277             :  */
     278             : List *
     279        1050 : logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
     280             : {
     281             :     int         i;
     282        1050 :     List       *res = NIL;
     283             : 
     284        1050 :     if (acquire_lock)
     285         216 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     286             : 
     287             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     288             : 
     289             :     /* Search for attached worker for a given subscription id. */
     290        5442 :     for (i = 0; i < max_logical_replication_workers; i++)
     291             :     {
     292        4392 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     293             : 
     294        4392 :         if (w->in_use && w->subid == subid && (!only_running || w->proc))
     295         716 :             res = lappend(res, w);
     296             :     }
     297             : 
     298        1050 :     if (acquire_lock)
     299         216 :         LWLockRelease(LogicalRepWorkerLock);
     300             : 
     301        1050 :     return res;
     302             : }
     303             : 
     304             : /*
     305             :  * Start new logical replication background worker, if possible.
     306             :  *
     307             :  * Returns true on success, false on failure.
     308             :  */
     309             : bool
     310         696 : logicalrep_worker_launch(LogicalRepWorkerType wtype,
     311             :                          Oid dbid, Oid subid, const char *subname, Oid userid,
     312             :                          Oid relid, dsm_handle subworker_dsm)
     313             : {
     314             :     BackgroundWorker bgw;
     315             :     BackgroundWorkerHandle *bgw_handle;
     316             :     uint16      generation;
     317             :     int         i;
     318         696 :     int         slot = 0;
     319         696 :     LogicalRepWorker *worker = NULL;
     320             :     int         nsyncworkers;
     321             :     int         nparallelapplyworkers;
     322             :     TimestampTz now;
     323         696 :     bool        is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
     324         696 :     bool        is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
     325             : 
     326             :     /*----------
     327             :      * Sanity checks:
     328             :      * - must be valid worker type
     329             :      * - tablesync workers are only ones to have relid
     330             :      * - parallel apply worker is the only kind of subworker
     331             :      */
     332             :     Assert(wtype != WORKERTYPE_UNKNOWN);
     333             :     Assert(is_tablesync_worker == OidIsValid(relid));
     334             :     Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
     335             : 
     336         696 :     ereport(DEBUG1,
     337             :             (errmsg_internal("starting logical replication worker for subscription \"%s\"",
     338             :                              subname)));
     339             : 
     340             :     /* Report this after the initial starting message for consistency. */
     341         696 :     if (max_active_replication_origins == 0)
     342           0 :         ereport(ERROR,
     343             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     344             :                  errmsg("cannot start logical replication workers when \"max_active_replication_origins\"=0")));
     345             : 
     346             :     /*
     347             :      * We need to do the modification of the shared memory under lock so that
     348             :      * we have consistent view.
     349             :      */
     350         696 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     351             : 
     352         696 : retry:
     353             :     /* Find unused worker slot. */
     354        1278 :     for (i = 0; i < max_logical_replication_workers; i++)
     355             :     {
     356        1278 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     357             : 
     358        1278 :         if (!w->in_use)
     359             :         {
     360         696 :             worker = w;
     361         696 :             slot = i;
     362         696 :             break;
     363             :         }
     364             :     }
     365             : 
     366         696 :     nsyncworkers = logicalrep_sync_worker_count(subid);
     367             : 
     368         696 :     now = GetCurrentTimestamp();
     369             : 
     370             :     /*
     371             :      * If we didn't find a free slot, try to do garbage collection.  The
     372             :      * reason we do this is because if some worker failed to start up and its
     373             :      * parent has crashed while waiting, the in_use state was never cleared.
     374             :      */
     375         696 :     if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
     376             :     {
     377           0 :         bool        did_cleanup = false;
     378             : 
     379           0 :         for (i = 0; i < max_logical_replication_workers; i++)
     380             :         {
     381           0 :             LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     382             : 
     383             :             /*
     384             :              * If the worker was marked in use but didn't manage to attach in
     385             :              * time, clean it up.
     386             :              */
     387           0 :             if (w->in_use && !w->proc &&
     388           0 :                 TimestampDifferenceExceeds(w->launch_time, now,
     389             :                                            wal_receiver_timeout))
     390             :             {
     391           0 :                 elog(WARNING,
     392             :                      "logical replication worker for subscription %u took too long to start; canceled",
     393             :                      w->subid);
     394             : 
     395           0 :                 logicalrep_worker_cleanup(w);
     396           0 :                 did_cleanup = true;
     397             :             }
     398             :         }
     399             : 
     400           0 :         if (did_cleanup)
     401           0 :             goto retry;
     402             :     }
     403             : 
     404             :     /*
     405             :      * We don't allow to invoke more sync workers once we have reached the
     406             :      * sync worker limit per subscription. So, just return silently as we
     407             :      * might get here because of an otherwise harmless race condition.
     408             :      */
     409         696 :     if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
     410             :     {
     411           0 :         LWLockRelease(LogicalRepWorkerLock);
     412           0 :         return false;
     413             :     }
     414             : 
     415         696 :     nparallelapplyworkers = logicalrep_pa_worker_count(subid);
     416             : 
     417             :     /*
     418             :      * Return false if the number of parallel apply workers reached the limit
     419             :      * per subscription.
     420             :      */
     421         696 :     if (is_parallel_apply_worker &&
     422          20 :         nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
     423             :     {
     424           0 :         LWLockRelease(LogicalRepWorkerLock);
     425           0 :         return false;
     426             :     }
     427             : 
     428             :     /*
     429             :      * However if there are no more free worker slots, inform user about it
     430             :      * before exiting.
     431             :      */
     432         696 :     if (worker == NULL)
     433             :     {
     434           0 :         LWLockRelease(LogicalRepWorkerLock);
     435           0 :         ereport(WARNING,
     436             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     437             :                  errmsg("out of logical replication worker slots"),
     438             :                  errhint("You might need to increase \"%s\".", "max_logical_replication_workers")));
     439           0 :         return false;
     440             :     }
     441             : 
     442             :     /* Prepare the worker slot. */
     443         696 :     worker->type = wtype;
     444         696 :     worker->launch_time = now;
     445         696 :     worker->in_use = true;
     446         696 :     worker->generation++;
     447         696 :     worker->proc = NULL;
     448         696 :     worker->dbid = dbid;
     449         696 :     worker->userid = userid;
     450         696 :     worker->subid = subid;
     451         696 :     worker->relid = relid;
     452         696 :     worker->relstate = SUBREL_STATE_UNKNOWN;
     453         696 :     worker->relstate_lsn = InvalidXLogRecPtr;
     454         696 :     worker->stream_fileset = NULL;
     455         696 :     worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
     456         696 :     worker->parallel_apply = is_parallel_apply_worker;
     457         696 :     worker->last_lsn = InvalidXLogRecPtr;
     458         696 :     TIMESTAMP_NOBEGIN(worker->last_send_time);
     459         696 :     TIMESTAMP_NOBEGIN(worker->last_recv_time);
     460         696 :     worker->reply_lsn = InvalidXLogRecPtr;
     461         696 :     TIMESTAMP_NOBEGIN(worker->reply_time);
     462             : 
     463             :     /* Before releasing lock, remember generation for future identification. */
     464         696 :     generation = worker->generation;
     465             : 
     466         696 :     LWLockRelease(LogicalRepWorkerLock);
     467             : 
     468             :     /* Register the new dynamic worker. */
     469         696 :     memset(&bgw, 0, sizeof(bgw));
     470         696 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
     471             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     472         696 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
     473         696 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
     474             : 
     475         696 :     switch (worker->type)
     476             :     {
     477         292 :         case WORKERTYPE_APPLY:
     478         292 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
     479         292 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     480             :                      "logical replication apply worker for subscription %u",
     481             :                      subid);
     482         292 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
     483         292 :             break;
     484             : 
     485          20 :         case WORKERTYPE_PARALLEL_APPLY:
     486          20 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
     487          20 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     488             :                      "logical replication parallel apply worker for subscription %u",
     489             :                      subid);
     490          20 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
     491             : 
     492          20 :             memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
     493          20 :             break;
     494             : 
     495         384 :         case WORKERTYPE_TABLESYNC:
     496         384 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
     497         384 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     498             :                      "logical replication tablesync worker for subscription %u sync %u",
     499             :                      subid,
     500             :                      relid);
     501         384 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
     502         384 :             break;
     503             : 
     504           0 :         case WORKERTYPE_UNKNOWN:
     505             :             /* Should never happen. */
     506           0 :             elog(ERROR, "unknown worker type");
     507             :     }
     508             : 
     509         696 :     bgw.bgw_restart_time = BGW_NEVER_RESTART;
     510         696 :     bgw.bgw_notify_pid = MyProcPid;
     511         696 :     bgw.bgw_main_arg = Int32GetDatum(slot);
     512             : 
     513         696 :     if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
     514             :     {
     515             :         /* Failed to start worker, so clean up the worker slot. */
     516           0 :         LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     517             :         Assert(generation == worker->generation);
     518           0 :         logicalrep_worker_cleanup(worker);
     519           0 :         LWLockRelease(LogicalRepWorkerLock);
     520             : 
     521           0 :         ereport(WARNING,
     522             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     523             :                  errmsg("out of background worker slots"),
     524             :                  errhint("You might need to increase \"%s\".", "max_worker_processes")));
     525           0 :         return false;
     526             :     }
     527             : 
     528             :     /* Now wait until it attaches. */
     529         696 :     return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
     530             : }
     531             : 
     532             : /*
     533             :  * Internal function to stop the worker and wait until it detaches from the
     534             :  * slot.
     535             :  */
     536             : static void
     537         158 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
     538             : {
     539             :     uint16      generation;
     540             : 
     541             :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
     542             : 
     543             :     /*
     544             :      * Remember which generation was our worker so we can check if what we see
     545             :      * is still the same one.
     546             :      */
     547         158 :     generation = worker->generation;
     548             : 
     549             :     /*
     550             :      * If we found a worker but it does not have proc set then it is still
     551             :      * starting up; wait for it to finish starting and then kill it.
     552             :      */
     553         162 :     while (worker->in_use && !worker->proc)
     554             :     {
     555             :         int         rc;
     556             : 
     557          10 :         LWLockRelease(LogicalRepWorkerLock);
     558             : 
     559             :         /* Wait a bit --- we don't expect to have to wait long. */
     560          10 :         rc = WaitLatch(MyLatch,
     561             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     562             :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     563             : 
     564          10 :         if (rc & WL_LATCH_SET)
     565             :         {
     566           0 :             ResetLatch(MyLatch);
     567           0 :             CHECK_FOR_INTERRUPTS();
     568             :         }
     569             : 
     570             :         /* Recheck worker status. */
     571          10 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     572             : 
     573             :         /*
     574             :          * Check whether the worker slot is no longer used, which would mean
     575             :          * that the worker has exited, or whether the worker generation is
     576             :          * different, meaning that a different worker has taken the slot.
     577             :          */
     578          10 :         if (!worker->in_use || worker->generation != generation)
     579           0 :             return;
     580             : 
     581             :         /* Worker has assigned proc, so it has started. */
     582          10 :         if (worker->proc)
     583           6 :             break;
     584             :     }
     585             : 
     586             :     /* Now terminate the worker ... */
     587         158 :     kill(worker->proc->pid, signo);
     588             : 
     589             :     /* ... and wait for it to die. */
     590             :     for (;;)
     591         242 :     {
     592             :         int         rc;
     593             : 
     594             :         /* is it gone? */
     595         400 :         if (!worker->proc || worker->generation != generation)
     596             :             break;
     597             : 
     598         242 :         LWLockRelease(LogicalRepWorkerLock);
     599             : 
     600             :         /* Wait a bit --- we don't expect to have to wait long. */
     601         242 :         rc = WaitLatch(MyLatch,
     602             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     603             :                        10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
     604             : 
     605         242 :         if (rc & WL_LATCH_SET)
     606             :         {
     607         116 :             ResetLatch(MyLatch);
     608         116 :             CHECK_FOR_INTERRUPTS();
     609             :         }
     610             : 
     611         242 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     612             :     }
     613             : }
     614             : 
     615             : /*
     616             :  * Stop the logical replication worker for subid/relid, if any.
     617             :  */
     618             : void
     619         182 : logicalrep_worker_stop(Oid subid, Oid relid)
     620             : {
     621             :     LogicalRepWorker *worker;
     622             : 
     623         182 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     624             : 
     625         182 :     worker = logicalrep_worker_find(subid, relid, false);
     626             : 
     627         182 :     if (worker)
     628             :     {
     629             :         Assert(!isParallelApplyWorker(worker));
     630         142 :         logicalrep_worker_stop_internal(worker, SIGTERM);
     631             :     }
     632             : 
     633         182 :     LWLockRelease(LogicalRepWorkerLock);
     634         182 : }
     635             : 
     636             : /*
     637             :  * Stop the given logical replication parallel apply worker.
     638             :  *
     639             :  * Node that the function sends SIGINT instead of SIGTERM to the parallel apply
     640             :  * worker so that the worker exits cleanly.
     641             :  */
     642             : void
     643          10 : logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
     644             : {
     645             :     int         slot_no;
     646             :     uint16      generation;
     647             :     LogicalRepWorker *worker;
     648             : 
     649          10 :     SpinLockAcquire(&winfo->shared->mutex);
     650          10 :     generation = winfo->shared->logicalrep_worker_generation;
     651          10 :     slot_no = winfo->shared->logicalrep_worker_slot_no;
     652          10 :     SpinLockRelease(&winfo->shared->mutex);
     653             : 
     654             :     Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
     655             : 
     656             :     /*
     657             :      * Detach from the error_mq_handle for the parallel apply worker before
     658             :      * stopping it. This prevents the leader apply worker from trying to
     659             :      * receive the message from the error queue that might already be detached
     660             :      * by the parallel apply worker.
     661             :      */
     662          10 :     if (winfo->error_mq_handle)
     663             :     {
     664          10 :         shm_mq_detach(winfo->error_mq_handle);
     665          10 :         winfo->error_mq_handle = NULL;
     666             :     }
     667             : 
     668          10 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     669             : 
     670          10 :     worker = &LogicalRepCtx->workers[slot_no];
     671             :     Assert(isParallelApplyWorker(worker));
     672             : 
     673             :     /*
     674             :      * Only stop the worker if the generation matches and the worker is alive.
     675             :      */
     676          10 :     if (worker->generation == generation && worker->proc)
     677          10 :         logicalrep_worker_stop_internal(worker, SIGINT);
     678             : 
     679          10 :     LWLockRelease(LogicalRepWorkerLock);
     680          10 : }
     681             : 
     682             : /*
     683             :  * Wake up (using latch) any logical replication worker for specified sub/rel.
     684             :  */
     685             : void
     686         418 : logicalrep_worker_wakeup(Oid subid, Oid relid)
     687             : {
     688             :     LogicalRepWorker *worker;
     689             : 
     690         418 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     691             : 
     692         418 :     worker = logicalrep_worker_find(subid, relid, true);
     693             : 
     694         418 :     if (worker)
     695         418 :         logicalrep_worker_wakeup_ptr(worker);
     696             : 
     697         418 :     LWLockRelease(LogicalRepWorkerLock);
     698         418 : }
     699             : 
     700             : /*
     701             :  * Wake up (using latch) the specified logical replication worker.
     702             :  *
     703             :  * Caller must hold lock, else worker->proc could change under us.
     704             :  */
     705             : void
     706        1252 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
     707             : {
     708             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     709             : 
     710        1252 :     SetLatch(&worker->proc->procLatch);
     711        1252 : }
     712             : 
     713             : /*
     714             :  * Attach to a slot.
     715             :  */
     716             : void
     717         866 : logicalrep_worker_attach(int slot)
     718             : {
     719             :     /* Block concurrent access. */
     720         866 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     721             : 
     722             :     Assert(slot >= 0 && slot < max_logical_replication_workers);
     723         866 :     MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
     724             : 
     725         866 :     if (!MyLogicalRepWorker->in_use)
     726             :     {
     727           0 :         LWLockRelease(LogicalRepWorkerLock);
     728           0 :         ereport(ERROR,
     729             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     730             :                  errmsg("logical replication worker slot %d is empty, cannot attach",
     731             :                         slot)));
     732             :     }
     733             : 
     734         866 :     if (MyLogicalRepWorker->proc)
     735             :     {
     736           0 :         LWLockRelease(LogicalRepWorkerLock);
     737           0 :         ereport(ERROR,
     738             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     739             :                  errmsg("logical replication worker slot %d is already used by "
     740             :                         "another worker, cannot attach", slot)));
     741             :     }
     742             : 
     743         866 :     MyLogicalRepWorker->proc = MyProc;
     744         866 :     before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
     745             : 
     746         866 :     LWLockRelease(LogicalRepWorkerLock);
     747         866 : }
     748             : 
     749             : /*
     750             :  * Stop the parallel apply workers if any, and detach the leader apply worker
     751             :  * (cleans up the worker info).
     752             :  */
     753             : static void
     754         866 : logicalrep_worker_detach(void)
     755             : {
     756             :     /* Stop the parallel apply workers. */
     757         866 :     if (am_leader_apply_worker())
     758             :     {
     759             :         List       *workers;
     760             :         ListCell   *lc;
     761             : 
     762             :         /*
     763             :          * Detach from the error_mq_handle for all parallel apply workers
     764             :          * before terminating them. This prevents the leader apply worker from
     765             :          * receiving the worker termination message and sending it to logs
     766             :          * when the same is already done by the parallel worker.
     767             :          */
     768         458 :         pa_detach_all_error_mq();
     769             : 
     770         458 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     771             : 
     772         458 :         workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
     773         922 :         foreach(lc, workers)
     774             :         {
     775         464 :             LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
     776             : 
     777         464 :             if (isParallelApplyWorker(w))
     778           6 :                 logicalrep_worker_stop_internal(w, SIGTERM);
     779             :         }
     780             : 
     781         458 :         LWLockRelease(LogicalRepWorkerLock);
     782             :     }
     783             : 
     784             :     /* Block concurrent access. */
     785         866 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     786             : 
     787         866 :     logicalrep_worker_cleanup(MyLogicalRepWorker);
     788             : 
     789         866 :     LWLockRelease(LogicalRepWorkerLock);
     790         866 : }
     791             : 
     792             : /*
     793             :  * Clean up worker info.
     794             :  */
     795             : static void
     796         866 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
     797             : {
     798             :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
     799             : 
     800         866 :     worker->type = WORKERTYPE_UNKNOWN;
     801         866 :     worker->in_use = false;
     802         866 :     worker->proc = NULL;
     803         866 :     worker->dbid = InvalidOid;
     804         866 :     worker->userid = InvalidOid;
     805         866 :     worker->subid = InvalidOid;
     806         866 :     worker->relid = InvalidOid;
     807         866 :     worker->leader_pid = InvalidPid;
     808         866 :     worker->parallel_apply = false;
     809         866 : }
     810             : 
     811             : /*
     812             :  * Cleanup function for logical replication launcher.
     813             :  *
     814             :  * Called on logical replication launcher exit.
     815             :  */
     816             : static void
     817         812 : logicalrep_launcher_onexit(int code, Datum arg)
     818             : {
     819         812 :     LogicalRepCtx->launcher_pid = 0;
     820         812 : }
     821             : 
     822             : /*
     823             :  * Cleanup function.
     824             :  *
     825             :  * Called on logical replication worker exit.
     826             :  */
     827             : static void
     828         866 : logicalrep_worker_onexit(int code, Datum arg)
     829             : {
     830             :     /* Disconnect gracefully from the remote side. */
     831         866 :     if (LogRepWorkerWalRcvConn)
     832         768 :         walrcv_disconnect(LogRepWorkerWalRcvConn);
     833             : 
     834         866 :     logicalrep_worker_detach();
     835             : 
     836             :     /* Cleanup fileset used for streaming transactions. */
     837         866 :     if (MyLogicalRepWorker->stream_fileset != NULL)
     838          28 :         FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
     839             : 
     840             :     /*
     841             :      * Session level locks may be acquired outside of a transaction in
     842             :      * parallel apply mode and will not be released when the worker
     843             :      * terminates, so manually release all locks before the worker exits.
     844             :      *
     845             :      * The locks will be acquired once the worker is initialized.
     846             :      */
     847         866 :     if (!InitializingApplyWorker)
     848         860 :         LockReleaseAll(DEFAULT_LOCKMETHOD, true);
     849             : 
     850         866 :     ApplyLauncherWakeup();
     851         866 : }
     852             : 
     853             : /*
     854             :  * Count the number of registered (not necessarily running) sync workers
     855             :  * for a subscription.
     856             :  */
     857             : int
     858        2282 : logicalrep_sync_worker_count(Oid subid)
     859             : {
     860             :     int         i;
     861        2282 :     int         res = 0;
     862             : 
     863             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     864             : 
     865             :     /* Search for attached worker for a given subscription id. */
     866       11754 :     for (i = 0; i < max_logical_replication_workers; i++)
     867             :     {
     868        9472 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     869             : 
     870        9472 :         if (isTablesyncWorker(w) && w->subid == subid)
     871        2640 :             res++;
     872             :     }
     873             : 
     874        2282 :     return res;
     875             : }
     876             : 
     877             : /*
     878             :  * Count the number of registered (but not necessarily running) parallel apply
     879             :  * workers for a subscription.
     880             :  */
     881             : static int
     882         696 : logicalrep_pa_worker_count(Oid subid)
     883             : {
     884             :     int         i;
     885         696 :     int         res = 0;
     886             : 
     887             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     888             : 
     889             :     /*
     890             :      * Scan all attached parallel apply workers, only counting those which
     891             :      * have the given subscription id.
     892             :      */
     893        3668 :     for (i = 0; i < max_logical_replication_workers; i++)
     894             :     {
     895        2972 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     896             : 
     897        2972 :         if (isParallelApplyWorker(w) && w->subid == subid)
     898           4 :             res++;
     899             :     }
     900             : 
     901         696 :     return res;
     902             : }
     903             : 
     904             : /*
     905             :  * ApplyLauncherShmemSize
     906             :  *      Compute space needed for replication launcher shared memory
     907             :  */
     908             : Size
     909        8204 : ApplyLauncherShmemSize(void)
     910             : {
     911             :     Size        size;
     912             : 
     913             :     /*
     914             :      * Need the fixed struct and the array of LogicalRepWorker.
     915             :      */
     916        8204 :     size = sizeof(LogicalRepCtxStruct);
     917        8204 :     size = MAXALIGN(size);
     918        8204 :     size = add_size(size, mul_size(max_logical_replication_workers,
     919             :                                    sizeof(LogicalRepWorker)));
     920        8204 :     return size;
     921             : }
     922             : 
     923             : /*
     924             :  * ApplyLauncherRegister
     925             :  *      Register a background worker running the logical replication launcher.
     926             :  */
     927             : void
     928        1712 : ApplyLauncherRegister(void)
     929             : {
     930             :     BackgroundWorker bgw;
     931             : 
     932             :     /*
     933             :      * The logical replication launcher is disabled during binary upgrades, to
     934             :      * prevent logical replication workers from running on the source cluster.
     935             :      * That could cause replication origins to move forward after having been
     936             :      * copied to the target cluster, potentially creating conflicts with the
     937             :      * copied data files.
     938             :      */
     939        1712 :     if (max_logical_replication_workers == 0 || IsBinaryUpgrade)
     940         100 :         return;
     941             : 
     942        1612 :     memset(&bgw, 0, sizeof(bgw));
     943        1612 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
     944             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     945        1612 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
     946        1612 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
     947        1612 :     snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
     948        1612 :     snprintf(bgw.bgw_name, BGW_MAXLEN,
     949             :              "logical replication launcher");
     950        1612 :     snprintf(bgw.bgw_type, BGW_MAXLEN,
     951             :              "logical replication launcher");
     952        1612 :     bgw.bgw_restart_time = 5;
     953        1612 :     bgw.bgw_notify_pid = 0;
     954        1612 :     bgw.bgw_main_arg = (Datum) 0;
     955             : 
     956        1612 :     RegisterBackgroundWorker(&bgw);
     957             : }
     958             : 
     959             : /*
     960             :  * ApplyLauncherShmemInit
     961             :  *      Allocate and initialize replication launcher shared memory
     962             :  */
     963             : void
     964        2126 : ApplyLauncherShmemInit(void)
     965             : {
     966             :     bool        found;
     967             : 
     968        2126 :     LogicalRepCtx = (LogicalRepCtxStruct *)
     969        2126 :         ShmemInitStruct("Logical Replication Launcher Data",
     970             :                         ApplyLauncherShmemSize(),
     971             :                         &found);
     972             : 
     973        2126 :     if (!found)
     974             :     {
     975             :         int         slot;
     976             : 
     977        2126 :         memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
     978             : 
     979        2126 :         LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
     980        2126 :         LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
     981             : 
     982             :         /* Initialize memory and spin locks for each worker slot. */
     983       10564 :         for (slot = 0; slot < max_logical_replication_workers; slot++)
     984             :         {
     985        8438 :             LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
     986             : 
     987        8438 :             memset(worker, 0, sizeof(LogicalRepWorker));
     988        8438 :             SpinLockInit(&worker->relmutex);
     989             :         }
     990             :     }
     991        2126 : }
     992             : 
     993             : /*
     994             :  * Initialize or attach to the dynamic shared hash table that stores the
     995             :  * last-start times, if not already done.
     996             :  * This must be called before accessing the table.
     997             :  */
     998             : static void
     999        1010 : logicalrep_launcher_attach_dshmem(void)
    1000             : {
    1001             :     MemoryContext oldcontext;
    1002             : 
    1003             :     /* Quick exit if we already did this. */
    1004        1010 :     if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
    1005         908 :         last_start_times != NULL)
    1006         668 :         return;
    1007             : 
    1008             :     /* Otherwise, use a lock to ensure only one process creates the table. */
    1009         342 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
    1010             : 
    1011             :     /* Be sure any local memory allocated by DSA routines is persistent. */
    1012         342 :     oldcontext = MemoryContextSwitchTo(TopMemoryContext);
    1013             : 
    1014         342 :     if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
    1015             :     {
    1016             :         /* Initialize dynamic shared hash table for last-start times. */
    1017         102 :         last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
    1018         102 :         dsa_pin(last_start_times_dsa);
    1019         102 :         dsa_pin_mapping(last_start_times_dsa);
    1020         102 :         last_start_times = dshash_create(last_start_times_dsa, &dsh_params, NULL);
    1021             : 
    1022             :         /* Store handles in shared memory for other backends to use. */
    1023         102 :         LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
    1024         102 :         LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
    1025             :     }
    1026         240 :     else if (!last_start_times)
    1027             :     {
    1028             :         /* Attach to existing dynamic shared hash table. */
    1029         240 :         last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
    1030         240 :         dsa_pin_mapping(last_start_times_dsa);
    1031         240 :         last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
    1032         240 :                                          LogicalRepCtx->last_start_dsh, NULL);
    1033             :     }
    1034             : 
    1035         342 :     MemoryContextSwitchTo(oldcontext);
    1036         342 :     LWLockRelease(LogicalRepWorkerLock);
    1037             : }
    1038             : 
    1039             : /*
    1040             :  * Set the last-start time for the subscription.
    1041             :  */
    1042             : static void
    1043         292 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
    1044             : {
    1045             :     LauncherLastStartTimesEntry *entry;
    1046             :     bool        found;
    1047             : 
    1048         292 :     logicalrep_launcher_attach_dshmem();
    1049             : 
    1050         292 :     entry = dshash_find_or_insert(last_start_times, &subid, &found);
    1051         292 :     entry->last_start_time = start_time;
    1052         292 :     dshash_release_lock(last_start_times, entry);
    1053         292 : }
    1054             : 
    1055             : /*
    1056             :  * Return the last-start time for the subscription, or 0 if there isn't one.
    1057             :  */
    1058             : static TimestampTz
    1059         416 : ApplyLauncherGetWorkerStartTime(Oid subid)
    1060             : {
    1061             :     LauncherLastStartTimesEntry *entry;
    1062             :     TimestampTz ret;
    1063             : 
    1064         416 :     logicalrep_launcher_attach_dshmem();
    1065             : 
    1066         416 :     entry = dshash_find(last_start_times, &subid, false);
    1067         416 :     if (entry == NULL)
    1068         248 :         return 0;
    1069             : 
    1070         168 :     ret = entry->last_start_time;
    1071         168 :     dshash_release_lock(last_start_times, entry);
    1072             : 
    1073         168 :     return ret;
    1074             : }
    1075             : 
    1076             : /*
    1077             :  * Remove the last-start-time entry for the subscription, if one exists.
    1078             :  *
    1079             :  * This has two use-cases: to remove the entry related to a subscription
    1080             :  * that's been deleted or disabled (just to avoid leaking shared memory),
    1081             :  * and to allow immediate restart of an apply worker that has exited
    1082             :  * due to subscription parameter changes.
    1083             :  */
    1084             : void
    1085         302 : ApplyLauncherForgetWorkerStartTime(Oid subid)
    1086             : {
    1087         302 :     logicalrep_launcher_attach_dshmem();
    1088             : 
    1089         302 :     (void) dshash_delete_key(last_start_times, &subid);
    1090         302 : }
    1091             : 
    1092             : /*
    1093             :  * Wakeup the launcher on commit if requested.
    1094             :  */
    1095             : void
    1096     1036470 : AtEOXact_ApplyLauncher(bool isCommit)
    1097             : {
    1098     1036470 :     if (isCommit)
    1099             :     {
    1100      986828 :         if (on_commit_launcher_wakeup)
    1101         268 :             ApplyLauncherWakeup();
    1102             :     }
    1103             : 
    1104     1036470 :     on_commit_launcher_wakeup = false;
    1105     1036470 : }
    1106             : 
    1107             : /*
    1108             :  * Request wakeup of the launcher on commit of the transaction.
    1109             :  *
    1110             :  * This is used to send launcher signal to stop sleeping and process the
    1111             :  * subscriptions when current transaction commits. Should be used when new
    1112             :  * tuple was added to the pg_subscription catalog.
    1113             : */
    1114             : void
    1115         268 : ApplyLauncherWakeupAtCommit(void)
    1116             : {
    1117         268 :     if (!on_commit_launcher_wakeup)
    1118         268 :         on_commit_launcher_wakeup = true;
    1119         268 : }
    1120             : 
    1121             : static void
    1122        1134 : ApplyLauncherWakeup(void)
    1123             : {
    1124        1134 :     if (LogicalRepCtx->launcher_pid != 0)
    1125        1098 :         kill(LogicalRepCtx->launcher_pid, SIGUSR1);
    1126        1134 : }
    1127             : 
    1128             : /*
    1129             :  * Main loop for the apply launcher process.
    1130             :  */
    1131             : void
    1132         812 : ApplyLauncherMain(Datum main_arg)
    1133             : {
    1134         812 :     ereport(DEBUG1,
    1135             :             (errmsg_internal("logical replication launcher started")));
    1136             : 
    1137         812 :     before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
    1138             : 
    1139             :     Assert(LogicalRepCtx->launcher_pid == 0);
    1140         812 :     LogicalRepCtx->launcher_pid = MyProcPid;
    1141             : 
    1142             :     /* Establish signal handlers. */
    1143         812 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
    1144         812 :     pqsignal(SIGTERM, die);
    1145         812 :     BackgroundWorkerUnblockSignals();
    1146             : 
    1147             :     /*
    1148             :      * Establish connection to nailed catalogs (we only ever access
    1149             :      * pg_subscription).
    1150             :      */
    1151         812 :     BackgroundWorkerInitializeConnection(NULL, NULL, 0);
    1152             : 
    1153             :     /* Enter main loop */
    1154             :     for (;;)
    1155        4440 :     {
    1156             :         int         rc;
    1157             :         List       *sublist;
    1158             :         ListCell   *lc;
    1159             :         MemoryContext subctx;
    1160             :         MemoryContext oldctx;
    1161        5252 :         long        wait_time = DEFAULT_NAPTIME_PER_CYCLE;
    1162             : 
    1163        5252 :         CHECK_FOR_INTERRUPTS();
    1164             : 
    1165             :         /* Use temporary context to avoid leaking memory across cycles. */
    1166        5248 :         subctx = AllocSetContextCreate(TopMemoryContext,
    1167             :                                        "Logical Replication Launcher sublist",
    1168             :                                        ALLOCSET_DEFAULT_SIZES);
    1169        5248 :         oldctx = MemoryContextSwitchTo(subctx);
    1170             : 
    1171             :         /* Start any missing workers for enabled subscriptions. */
    1172        5248 :         sublist = get_subscription_list();
    1173        6526 :         foreach(lc, sublist)
    1174             :         {
    1175        1278 :             Subscription *sub = (Subscription *) lfirst(lc);
    1176             :             LogicalRepWorker *w;
    1177             :             TimestampTz last_start;
    1178             :             TimestampTz now;
    1179             :             long        elapsed;
    1180             : 
    1181        1278 :             if (!sub->enabled)
    1182          56 :                 continue;
    1183             : 
    1184        1222 :             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1185        1222 :             w = logicalrep_worker_find(sub->oid, InvalidOid, false);
    1186        1222 :             LWLockRelease(LogicalRepWorkerLock);
    1187             : 
    1188        1222 :             if (w != NULL)
    1189         806 :                 continue;       /* worker is running already */
    1190             : 
    1191             :             /*
    1192             :              * If the worker is eligible to start now, launch it.  Otherwise,
    1193             :              * adjust wait_time so that we'll wake up as soon as it can be
    1194             :              * started.
    1195             :              *
    1196             :              * Each subscription's apply worker can only be restarted once per
    1197             :              * wal_retrieve_retry_interval, so that errors do not cause us to
    1198             :              * repeatedly restart the worker as fast as possible.  In cases
    1199             :              * where a restart is expected (e.g., subscription parameter
    1200             :              * changes), another process should remove the last-start entry
    1201             :              * for the subscription so that the worker can be restarted
    1202             :              * without waiting for wal_retrieve_retry_interval to elapse.
    1203             :              */
    1204         416 :             last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
    1205         416 :             now = GetCurrentTimestamp();
    1206         416 :             if (last_start == 0 ||
    1207         168 :                 (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
    1208             :             {
    1209         292 :                 ApplyLauncherSetWorkerStartTime(sub->oid, now);
    1210         292 :                 if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
    1211         292 :                                               sub->dbid, sub->oid, sub->name,
    1212             :                                               sub->owner, InvalidOid,
    1213             :                                               DSM_HANDLE_INVALID))
    1214             :                 {
    1215             :                     /*
    1216             :                      * We get here either if we failed to launch a worker
    1217             :                      * (perhaps for resource-exhaustion reasons) or if we
    1218             :                      * launched one but it immediately quit.  Either way, it
    1219             :                      * seems appropriate to try again after
    1220             :                      * wal_retrieve_retry_interval.
    1221             :                      */
    1222           0 :                     wait_time = Min(wait_time,
    1223             :                                     wal_retrieve_retry_interval);
    1224             :                 }
    1225             :             }
    1226             :             else
    1227             :             {
    1228         124 :                 wait_time = Min(wait_time,
    1229             :                                 wal_retrieve_retry_interval - elapsed);
    1230             :             }
    1231             :         }
    1232             : 
    1233             :         /* Switch back to original memory context. */
    1234        5248 :         MemoryContextSwitchTo(oldctx);
    1235             :         /* Clean the temporary memory. */
    1236        5248 :         MemoryContextDelete(subctx);
    1237             : 
    1238             :         /* Wait for more work. */
    1239        5248 :         rc = WaitLatch(MyLatch,
    1240             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1241             :                        wait_time,
    1242             :                        WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
    1243             : 
    1244        5242 :         if (rc & WL_LATCH_SET)
    1245             :         {
    1246        5218 :             ResetLatch(MyLatch);
    1247        5218 :             CHECK_FOR_INTERRUPTS();
    1248             :         }
    1249             : 
    1250        4440 :         if (ConfigReloadPending)
    1251             :         {
    1252          72 :             ConfigReloadPending = false;
    1253          72 :             ProcessConfigFile(PGC_SIGHUP);
    1254             :         }
    1255             :     }
    1256             : 
    1257             :     /* Not reachable */
    1258             : }
    1259             : 
    1260             : /*
    1261             :  * Is current process the logical replication launcher?
    1262             :  */
    1263             : bool
    1264        1018 : IsLogicalLauncher(void)
    1265             : {
    1266        1018 :     return LogicalRepCtx->launcher_pid == MyProcPid;
    1267             : }
    1268             : 
    1269             : /*
    1270             :  * Return the pid of the leader apply worker if the given pid is the pid of a
    1271             :  * parallel apply worker, otherwise, return InvalidPid.
    1272             :  */
    1273             : pid_t
    1274        1808 : GetLeaderApplyWorkerPid(pid_t pid)
    1275             : {
    1276        1808 :     int         leader_pid = InvalidPid;
    1277             :     int         i;
    1278             : 
    1279        1808 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1280             : 
    1281        9040 :     for (i = 0; i < max_logical_replication_workers; i++)
    1282             :     {
    1283        7232 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
    1284             : 
    1285        7232 :         if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
    1286             :         {
    1287           0 :             leader_pid = w->leader_pid;
    1288           0 :             break;
    1289             :         }
    1290             :     }
    1291             : 
    1292        1808 :     LWLockRelease(LogicalRepWorkerLock);
    1293             : 
    1294        1808 :     return leader_pid;
    1295             : }
    1296             : 
    1297             : /*
    1298             :  * Returns state of the subscriptions.
    1299             :  */
    1300             : Datum
    1301           2 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
    1302             : {
    1303             : #define PG_STAT_GET_SUBSCRIPTION_COLS   10
    1304           2 :     Oid         subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
    1305             :     int         i;
    1306           2 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1307             : 
    1308           2 :     InitMaterializedSRF(fcinfo, 0);
    1309             : 
    1310             :     /* Make sure we get consistent view of the workers. */
    1311           2 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1312             : 
    1313          10 :     for (i = 0; i < max_logical_replication_workers; i++)
    1314             :     {
    1315             :         /* for each row */
    1316           8 :         Datum       values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
    1317           8 :         bool        nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
    1318             :         int         worker_pid;
    1319             :         LogicalRepWorker worker;
    1320             : 
    1321           8 :         memcpy(&worker, &LogicalRepCtx->workers[i],
    1322             :                sizeof(LogicalRepWorker));
    1323           8 :         if (!worker.proc || !IsBackendPid(worker.proc->pid))
    1324           4 :             continue;
    1325             : 
    1326           4 :         if (OidIsValid(subid) && worker.subid != subid)
    1327           0 :             continue;
    1328             : 
    1329           4 :         worker_pid = worker.proc->pid;
    1330             : 
    1331           4 :         values[0] = ObjectIdGetDatum(worker.subid);
    1332           4 :         if (isTablesyncWorker(&worker))
    1333           0 :             values[1] = ObjectIdGetDatum(worker.relid);
    1334             :         else
    1335           4 :             nulls[1] = true;
    1336           4 :         values[2] = Int32GetDatum(worker_pid);
    1337             : 
    1338           4 :         if (isParallelApplyWorker(&worker))
    1339           0 :             values[3] = Int32GetDatum(worker.leader_pid);
    1340             :         else
    1341           4 :             nulls[3] = true;
    1342             : 
    1343           4 :         if (XLogRecPtrIsInvalid(worker.last_lsn))
    1344           2 :             nulls[4] = true;
    1345             :         else
    1346           2 :             values[4] = LSNGetDatum(worker.last_lsn);
    1347           4 :         if (worker.last_send_time == 0)
    1348           0 :             nulls[5] = true;
    1349             :         else
    1350           4 :             values[5] = TimestampTzGetDatum(worker.last_send_time);
    1351           4 :         if (worker.last_recv_time == 0)
    1352           0 :             nulls[6] = true;
    1353             :         else
    1354           4 :             values[6] = TimestampTzGetDatum(worker.last_recv_time);
    1355           4 :         if (XLogRecPtrIsInvalid(worker.reply_lsn))
    1356           2 :             nulls[7] = true;
    1357             :         else
    1358           2 :             values[7] = LSNGetDatum(worker.reply_lsn);
    1359           4 :         if (worker.reply_time == 0)
    1360           0 :             nulls[8] = true;
    1361             :         else
    1362           4 :             values[8] = TimestampTzGetDatum(worker.reply_time);
    1363             : 
    1364           4 :         switch (worker.type)
    1365             :         {
    1366           4 :             case WORKERTYPE_APPLY:
    1367           4 :                 values[9] = CStringGetTextDatum("apply");
    1368           4 :                 break;
    1369           0 :             case WORKERTYPE_PARALLEL_APPLY:
    1370           0 :                 values[9] = CStringGetTextDatum("parallel apply");
    1371           0 :                 break;
    1372           0 :             case WORKERTYPE_TABLESYNC:
    1373           0 :                 values[9] = CStringGetTextDatum("table synchronization");
    1374           0 :                 break;
    1375           0 :             case WORKERTYPE_UNKNOWN:
    1376             :                 /* Should never happen. */
    1377           0 :                 elog(ERROR, "unknown worker type");
    1378             :         }
    1379             : 
    1380           4 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
    1381             :                              values, nulls);
    1382             : 
    1383             :         /*
    1384             :          * If only a single subscription was requested, and we found it,
    1385             :          * break.
    1386             :          */
    1387           4 :         if (OidIsValid(subid))
    1388           0 :             break;
    1389             :     }
    1390             : 
    1391           2 :     LWLockRelease(LogicalRepWorkerLock);
    1392             : 
    1393           2 :     return (Datum) 0;
    1394             : }

Generated by: LCOV version 1.16