LCOV - code coverage report
Current view: top level - src/backend/replication/logical - launcher.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 487 545 89.4 %
Date: 2025-10-10 10:17:52 Functions: 36 36 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * launcher.c
       3             :  *     PostgreSQL logical replication worker launcher process
       4             :  *
       5             :  * Copyright (c) 2016-2025, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/launcher.c
       9             :  *
      10             :  * NOTES
      11             :  *    This module contains the logical replication worker launcher which
      12             :  *    uses the background worker infrastructure to start the logical
      13             :  *    replication workers for every enabled subscription.
      14             :  *
      15             :  *-------------------------------------------------------------------------
      16             :  */
      17             : 
      18             : #include "postgres.h"
      19             : 
      20             : #include "access/heapam.h"
      21             : #include "access/htup.h"
      22             : #include "access/htup_details.h"
      23             : #include "access/tableam.h"
      24             : #include "access/xact.h"
      25             : #include "catalog/pg_subscription.h"
      26             : #include "catalog/pg_subscription_rel.h"
      27             : #include "funcapi.h"
      28             : #include "lib/dshash.h"
      29             : #include "miscadmin.h"
      30             : #include "pgstat.h"
      31             : #include "postmaster/bgworker.h"
      32             : #include "postmaster/interrupt.h"
      33             : #include "replication/logicallauncher.h"
      34             : #include "replication/origin.h"
      35             : #include "replication/slot.h"
      36             : #include "replication/walreceiver.h"
      37             : #include "replication/worker_internal.h"
      38             : #include "storage/ipc.h"
      39             : #include "storage/proc.h"
      40             : #include "storage/procarray.h"
      41             : #include "tcop/tcopprot.h"
      42             : #include "utils/builtins.h"
      43             : #include "utils/memutils.h"
      44             : #include "utils/pg_lsn.h"
      45             : #include "utils/snapmgr.h"
      46             : #include "utils/syscache.h"
      47             : 
      48             : /* max sleep time between cycles (3min) */
      49             : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
      50             : 
      51             : /* GUC variables */
      52             : int         max_logical_replication_workers = 4;
      53             : int         max_sync_workers_per_subscription = 2;
      54             : int         max_parallel_apply_workers_per_subscription = 2;
      55             : 
      56             : LogicalRepWorker *MyLogicalRepWorker = NULL;
      57             : 
      58             : typedef struct LogicalRepCtxStruct
      59             : {
      60             :     /* Supervisor process. */
      61             :     pid_t       launcher_pid;
      62             : 
      63             :     /* Hash table holding last start times of subscriptions' apply workers. */
      64             :     dsa_handle  last_start_dsa;
      65             :     dshash_table_handle last_start_dsh;
      66             : 
      67             :     /* Background workers. */
      68             :     LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
      69             : } LogicalRepCtxStruct;
      70             : 
      71             : static LogicalRepCtxStruct *LogicalRepCtx;
      72             : 
      73             : /* an entry in the last-start-times shared hash table */
      74             : typedef struct LauncherLastStartTimesEntry
      75             : {
      76             :     Oid         subid;          /* OID of logrep subscription (hash key) */
      77             :     TimestampTz last_start_time;    /* last time its apply worker was started */
      78             : } LauncherLastStartTimesEntry;
      79             : 
      80             : /* parameters for the last-start-times shared hash table */
      81             : static const dshash_parameters dsh_params = {
      82             :     sizeof(Oid),
      83             :     sizeof(LauncherLastStartTimesEntry),
      84             :     dshash_memcmp,
      85             :     dshash_memhash,
      86             :     dshash_memcpy,
      87             :     LWTRANCHE_LAUNCHER_HASH
      88             : };
      89             : 
      90             : static dsa_area *last_start_times_dsa = NULL;
      91             : static dshash_table *last_start_times = NULL;
      92             : 
      93             : static bool on_commit_launcher_wakeup = false;
      94             : 
      95             : 
      96             : static void logicalrep_launcher_onexit(int code, Datum arg);
      97             : static void logicalrep_worker_onexit(int code, Datum arg);
      98             : static void logicalrep_worker_detach(void);
      99             : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
     100             : static int  logicalrep_pa_worker_count(Oid subid);
     101             : static void logicalrep_launcher_attach_dshmem(void);
     102             : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
     103             : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
     104             : static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
     105             : static bool acquire_conflict_slot_if_exists(void);
     106             : static void update_conflict_slot_xmin(TransactionId new_xmin);
     107             : static void init_conflict_slot_xmin(void);
     108             : 
     109             : 
     110             : /*
     111             :  * Load the list of subscriptions.
     112             :  *
     113             :  * Only the fields interesting for worker start/stop functions are filled for
     114             :  * each subscription.
     115             :  */
     116             : static List *
     117        5742 : get_subscription_list(void)
     118             : {
     119        5742 :     List       *res = NIL;
     120             :     Relation    rel;
     121             :     TableScanDesc scan;
     122             :     HeapTuple   tup;
     123             :     MemoryContext resultcxt;
     124             : 
     125             :     /* This is the context that we will allocate our output data in */
     126        5742 :     resultcxt = CurrentMemoryContext;
     127             : 
     128             :     /*
     129             :      * Start a transaction so we can access pg_subscription.
     130             :      */
     131        5742 :     StartTransactionCommand();
     132             : 
     133        5742 :     rel = table_open(SubscriptionRelationId, AccessShareLock);
     134        5742 :     scan = table_beginscan_catalog(rel, 0, NULL);
     135             : 
     136        7468 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     137             :     {
     138        1726 :         Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
     139             :         Subscription *sub;
     140             :         MemoryContext oldcxt;
     141             : 
     142             :         /*
     143             :          * Allocate our results in the caller's context, not the
     144             :          * transaction's. We do this inside the loop, and restore the original
     145             :          * context at the end, so that leaky things like heap_getnext() are
     146             :          * not called in a potentially long-lived context.
     147             :          */
     148        1726 :         oldcxt = MemoryContextSwitchTo(resultcxt);
     149             : 
     150        1726 :         sub = (Subscription *) palloc0(sizeof(Subscription));
     151        1726 :         sub->oid = subform->oid;
     152        1726 :         sub->dbid = subform->subdbid;
     153        1726 :         sub->owner = subform->subowner;
     154        1726 :         sub->enabled = subform->subenabled;
     155        1726 :         sub->name = pstrdup(NameStr(subform->subname));
     156        1726 :         sub->retaindeadtuples = subform->subretaindeadtuples;
     157        1726 :         sub->retentionactive = subform->subretentionactive;
     158             :         /* We don't fill fields we are not interested in. */
     159             : 
     160        1726 :         res = lappend(res, sub);
     161        1726 :         MemoryContextSwitchTo(oldcxt);
     162             :     }
     163             : 
     164        5742 :     table_endscan(scan);
     165        5742 :     table_close(rel, AccessShareLock);
     166             : 
     167        5742 :     CommitTransactionCommand();
     168             : 
     169        5742 :     return res;
     170             : }
     171             : 
     172             : /*
     173             :  * Wait for a background worker to start up and attach to the shmem context.
     174             :  *
     175             :  * This is only needed for cleaning up the shared memory in case the worker
     176             :  * fails to attach.
     177             :  *
     178             :  * Returns whether the attach was successful.
     179             :  */
     180             : static bool
     181         810 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
     182             :                                uint16 generation,
     183             :                                BackgroundWorkerHandle *handle)
     184             : {
     185         810 :     bool        result = false;
     186         810 :     bool        dropped_latch = false;
     187             : 
     188             :     for (;;)
     189        2288 :     {
     190             :         BgwHandleStatus status;
     191             :         pid_t       pid;
     192             :         int         rc;
     193             : 
     194        3098 :         CHECK_FOR_INTERRUPTS();
     195             : 
     196        3098 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     197             : 
     198             :         /* Worker either died or has started. Return false if died. */
     199        3098 :         if (!worker->in_use || worker->proc)
     200             :         {
     201         808 :             result = worker->in_use;
     202         808 :             LWLockRelease(LogicalRepWorkerLock);
     203         808 :             break;
     204             :         }
     205             : 
     206        2290 :         LWLockRelease(LogicalRepWorkerLock);
     207             : 
     208             :         /* Check if worker has died before attaching, and clean up after it. */
     209        2290 :         status = GetBackgroundWorkerPid(handle, &pid);
     210             : 
     211        2290 :         if (status == BGWH_STOPPED)
     212             :         {
     213           0 :             LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     214             :             /* Ensure that this was indeed the worker we waited for. */
     215           0 :             if (generation == worker->generation)
     216           0 :                 logicalrep_worker_cleanup(worker);
     217           0 :             LWLockRelease(LogicalRepWorkerLock);
     218           0 :             break;              /* result is already false */
     219             :         }
     220             : 
     221             :         /*
     222             :          * We need timeout because we generally don't get notified via latch
     223             :          * about the worker attach.  But we don't expect to have to wait long.
     224             :          */
     225        2290 :         rc = WaitLatch(MyLatch,
     226             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     227             :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     228             : 
     229        2290 :         if (rc & WL_LATCH_SET)
     230             :         {
     231         952 :             ResetLatch(MyLatch);
     232         952 :             CHECK_FOR_INTERRUPTS();
     233         950 :             dropped_latch = true;
     234             :         }
     235             :     }
     236             : 
     237             :     /*
     238             :      * If we had to clear a latch event in order to wait, be sure to restore
     239             :      * it before exiting.  Otherwise caller may miss events.
     240             :      */
     241         808 :     if (dropped_latch)
     242         808 :         SetLatch(MyLatch);
     243             : 
     244         808 :     return result;
     245             : }
     246             : 
     247             : /*
     248             :  * Walks the workers array and searches for one that matches given
     249             :  * subscription id and relid.
     250             :  *
     251             :  * We are only interested in the leader apply worker or table sync worker.
     252             :  */
     253             : LogicalRepWorker *
     254        6016 : logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
     255             : {
     256             :     int         i;
     257        6016 :     LogicalRepWorker *res = NULL;
     258             : 
     259             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     260             : 
     261             :     /* Search for attached worker for a given subscription id. */
     262       18254 :     for (i = 0; i < max_logical_replication_workers; i++)
     263             :     {
     264       15950 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     265             : 
     266             :         /* Skip parallel apply workers. */
     267       15950 :         if (isParallelApplyWorker(w))
     268           0 :             continue;
     269             : 
     270       15950 :         if (w->in_use && w->subid == subid && w->relid == relid &&
     271        3712 :             (!only_running || w->proc))
     272             :         {
     273        3712 :             res = w;
     274        3712 :             break;
     275             :         }
     276             :     }
     277             : 
     278        6016 :     return res;
     279             : }
     280             : 
     281             : /*
     282             :  * Similar to logicalrep_worker_find(), but returns a list of all workers for
     283             :  * the subscription, instead of just one.
     284             :  */
     285             : List *
     286        1282 : logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
     287             : {
     288             :     int         i;
     289        1282 :     List       *res = NIL;
     290             : 
     291        1282 :     if (acquire_lock)
     292         238 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     293             : 
     294             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     295             : 
     296             :     /* Search for attached worker for a given subscription id. */
     297        6650 :     for (i = 0; i < max_logical_replication_workers; i++)
     298             :     {
     299        5368 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     300             : 
     301        5368 :         if (w->in_use && w->subid == subid && (!only_running || w->proc))
     302         904 :             res = lappend(res, w);
     303             :     }
     304             : 
     305        1282 :     if (acquire_lock)
     306         238 :         LWLockRelease(LogicalRepWorkerLock);
     307             : 
     308        1282 :     return res;
     309             : }
     310             : 
     311             : /*
     312             :  * Start new logical replication background worker, if possible.
     313             :  *
     314             :  * Returns true on success, false on failure.
     315             :  */
     316             : bool
     317         810 : logicalrep_worker_launch(LogicalRepWorkerType wtype,
     318             :                          Oid dbid, Oid subid, const char *subname, Oid userid,
     319             :                          Oid relid, dsm_handle subworker_dsm,
     320             :                          bool retain_dead_tuples)
     321             : {
     322             :     BackgroundWorker bgw;
     323             :     BackgroundWorkerHandle *bgw_handle;
     324             :     uint16      generation;
     325             :     int         i;
     326         810 :     int         slot = 0;
     327         810 :     LogicalRepWorker *worker = NULL;
     328             :     int         nsyncworkers;
     329             :     int         nparallelapplyworkers;
     330             :     TimestampTz now;
     331         810 :     bool        is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
     332         810 :     bool        is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
     333             : 
     334             :     /*----------
     335             :      * Sanity checks:
     336             :      * - must be valid worker type
     337             :      * - tablesync workers are only ones to have relid
     338             :      * - parallel apply worker is the only kind of subworker
     339             :      * - The replication slot used in conflict detection is created when
     340             :      *   retain_dead_tuples is enabled
     341             :      */
     342             :     Assert(wtype != WORKERTYPE_UNKNOWN);
     343             :     Assert(is_tablesync_worker == OidIsValid(relid));
     344             :     Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
     345             :     Assert(!retain_dead_tuples || MyReplicationSlot);
     346             : 
     347         810 :     ereport(DEBUG1,
     348             :             (errmsg_internal("starting logical replication worker for subscription \"%s\"",
     349             :                              subname)));
     350             : 
     351             :     /* Report this after the initial starting message for consistency. */
     352         810 :     if (max_active_replication_origins == 0)
     353           0 :         ereport(ERROR,
     354             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     355             :                  errmsg("cannot start logical replication workers when \"max_active_replication_origins\" is 0")));
     356             : 
     357             :     /*
     358             :      * We need to do the modification of the shared memory under lock so that
     359             :      * we have consistent view.
     360             :      */
     361         810 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     362             : 
     363         810 : retry:
     364             :     /* Find unused worker slot. */
     365        1410 :     for (i = 0; i < max_logical_replication_workers; i++)
     366             :     {
     367        1410 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     368             : 
     369        1410 :         if (!w->in_use)
     370             :         {
     371         810 :             worker = w;
     372         810 :             slot = i;
     373         810 :             break;
     374             :         }
     375             :     }
     376             : 
     377         810 :     nsyncworkers = logicalrep_sync_worker_count(subid);
     378             : 
     379         810 :     now = GetCurrentTimestamp();
     380             : 
     381             :     /*
     382             :      * If we didn't find a free slot, try to do garbage collection.  The
     383             :      * reason we do this is because if some worker failed to start up and its
     384             :      * parent has crashed while waiting, the in_use state was never cleared.
     385             :      */
     386         810 :     if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
     387             :     {
     388           0 :         bool        did_cleanup = false;
     389             : 
     390           0 :         for (i = 0; i < max_logical_replication_workers; i++)
     391             :         {
     392           0 :             LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     393             : 
     394             :             /*
     395             :              * If the worker was marked in use but didn't manage to attach in
     396             :              * time, clean it up.
     397             :              */
     398           0 :             if (w->in_use && !w->proc &&
     399           0 :                 TimestampDifferenceExceeds(w->launch_time, now,
     400             :                                            wal_receiver_timeout))
     401             :             {
     402           0 :                 elog(WARNING,
     403             :                      "logical replication worker for subscription %u took too long to start; canceled",
     404             :                      w->subid);
     405             : 
     406           0 :                 logicalrep_worker_cleanup(w);
     407           0 :                 did_cleanup = true;
     408             :             }
     409             :         }
     410             : 
     411           0 :         if (did_cleanup)
     412           0 :             goto retry;
     413             :     }
     414             : 
     415             :     /*
     416             :      * We don't allow to invoke more sync workers once we have reached the
     417             :      * sync worker limit per subscription. So, just return silently as we
     418             :      * might get here because of an otherwise harmless race condition.
     419             :      */
     420         810 :     if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
     421             :     {
     422           0 :         LWLockRelease(LogicalRepWorkerLock);
     423           0 :         return false;
     424             :     }
     425             : 
     426         810 :     nparallelapplyworkers = logicalrep_pa_worker_count(subid);
     427             : 
     428             :     /*
     429             :      * Return false if the number of parallel apply workers reached the limit
     430             :      * per subscription.
     431             :      */
     432         810 :     if (is_parallel_apply_worker &&
     433          20 :         nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
     434             :     {
     435           0 :         LWLockRelease(LogicalRepWorkerLock);
     436           0 :         return false;
     437             :     }
     438             : 
     439             :     /*
     440             :      * However if there are no more free worker slots, inform user about it
     441             :      * before exiting.
     442             :      */
     443         810 :     if (worker == NULL)
     444             :     {
     445           0 :         LWLockRelease(LogicalRepWorkerLock);
     446           0 :         ereport(WARNING,
     447             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     448             :                  errmsg("out of logical replication worker slots"),
     449             :                  errhint("You might need to increase \"%s\".", "max_logical_replication_workers")));
     450           0 :         return false;
     451             :     }
     452             : 
     453             :     /* Prepare the worker slot. */
     454         810 :     worker->type = wtype;
     455         810 :     worker->launch_time = now;
     456         810 :     worker->in_use = true;
     457         810 :     worker->generation++;
     458         810 :     worker->proc = NULL;
     459         810 :     worker->dbid = dbid;
     460         810 :     worker->userid = userid;
     461         810 :     worker->subid = subid;
     462         810 :     worker->relid = relid;
     463         810 :     worker->relstate = SUBREL_STATE_UNKNOWN;
     464         810 :     worker->relstate_lsn = InvalidXLogRecPtr;
     465         810 :     worker->stream_fileset = NULL;
     466         810 :     worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
     467         810 :     worker->parallel_apply = is_parallel_apply_worker;
     468         810 :     worker->oldest_nonremovable_xid = retain_dead_tuples
     469           4 :         ? MyReplicationSlot->data.xmin
     470         810 :         : InvalidTransactionId;
     471         810 :     worker->last_lsn = InvalidXLogRecPtr;
     472         810 :     TIMESTAMP_NOBEGIN(worker->last_send_time);
     473         810 :     TIMESTAMP_NOBEGIN(worker->last_recv_time);
     474         810 :     worker->reply_lsn = InvalidXLogRecPtr;
     475         810 :     TIMESTAMP_NOBEGIN(worker->reply_time);
     476             : 
     477             :     /* Before releasing lock, remember generation for future identification. */
     478         810 :     generation = worker->generation;
     479             : 
     480         810 :     LWLockRelease(LogicalRepWorkerLock);
     481             : 
     482             :     /* Register the new dynamic worker. */
     483         810 :     memset(&bgw, 0, sizeof(bgw));
     484         810 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
     485             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     486         810 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
     487         810 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
     488             : 
     489         810 :     switch (worker->type)
     490             :     {
     491         402 :         case WORKERTYPE_APPLY:
     492         402 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
     493         402 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     494             :                      "logical replication apply worker for subscription %u",
     495             :                      subid);
     496         402 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
     497         402 :             break;
     498             : 
     499          20 :         case WORKERTYPE_PARALLEL_APPLY:
     500          20 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
     501          20 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     502             :                      "logical replication parallel apply worker for subscription %u",
     503             :                      subid);
     504          20 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
     505             : 
     506          20 :             memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
     507          20 :             break;
     508             : 
     509         388 :         case WORKERTYPE_TABLESYNC:
     510         388 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
     511         388 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     512             :                      "logical replication tablesync worker for subscription %u sync %u",
     513             :                      subid,
     514             :                      relid);
     515         388 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
     516         388 :             break;
     517             : 
     518           0 :         case WORKERTYPE_UNKNOWN:
     519             :             /* Should never happen. */
     520           0 :             elog(ERROR, "unknown worker type");
     521             :     }
     522             : 
     523         810 :     bgw.bgw_restart_time = BGW_NEVER_RESTART;
     524         810 :     bgw.bgw_notify_pid = MyProcPid;
     525         810 :     bgw.bgw_main_arg = Int32GetDatum(slot);
     526             : 
     527         810 :     if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
     528             :     {
     529             :         /* Failed to start worker, so clean up the worker slot. */
     530           0 :         LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     531             :         Assert(generation == worker->generation);
     532           0 :         logicalrep_worker_cleanup(worker);
     533           0 :         LWLockRelease(LogicalRepWorkerLock);
     534             : 
     535           0 :         ereport(WARNING,
     536             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     537             :                  errmsg("out of background worker slots"),
     538             :                  errhint("You might need to increase \"%s\".", "max_worker_processes")));
     539           0 :         return false;
     540             :     }
     541             : 
     542             :     /* Now wait until it attaches. */
     543         810 :     return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
     544             : }
     545             : 
     546             : /*
     547             :  * Internal function to stop the worker and wait until it detaches from the
     548             :  * slot.
     549             :  */
     550             : static void
     551         164 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
     552             : {
     553             :     uint16      generation;
     554             : 
     555             :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
     556             : 
     557             :     /*
     558             :      * Remember which generation was our worker so we can check if what we see
     559             :      * is still the same one.
     560             :      */
     561         164 :     generation = worker->generation;
     562             : 
     563             :     /*
     564             :      * If we found a worker but it does not have proc set then it is still
     565             :      * starting up; wait for it to finish starting and then kill it.
     566             :      */
     567         168 :     while (worker->in_use && !worker->proc)
     568             :     {
     569             :         int         rc;
     570             : 
     571          10 :         LWLockRelease(LogicalRepWorkerLock);
     572             : 
     573             :         /* Wait a bit --- we don't expect to have to wait long. */
     574          10 :         rc = WaitLatch(MyLatch,
     575             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     576             :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     577             : 
     578          10 :         if (rc & WL_LATCH_SET)
     579             :         {
     580           0 :             ResetLatch(MyLatch);
     581           0 :             CHECK_FOR_INTERRUPTS();
     582             :         }
     583             : 
     584             :         /* Recheck worker status. */
     585          10 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     586             : 
     587             :         /*
     588             :          * Check whether the worker slot is no longer used, which would mean
     589             :          * that the worker has exited, or whether the worker generation is
     590             :          * different, meaning that a different worker has taken the slot.
     591             :          */
     592          10 :         if (!worker->in_use || worker->generation != generation)
     593           0 :             return;
     594             : 
     595             :         /* Worker has assigned proc, so it has started. */
     596          10 :         if (worker->proc)
     597           6 :             break;
     598             :     }
     599             : 
     600             :     /* Now terminate the worker ... */
     601         164 :     kill(worker->proc->pid, signo);
     602             : 
     603             :     /* ... and wait for it to die. */
     604             :     for (;;)
     605         232 :     {
     606             :         int         rc;
     607             : 
     608             :         /* is it gone? */
     609         396 :         if (!worker->proc || worker->generation != generation)
     610             :             break;
     611             : 
     612         232 :         LWLockRelease(LogicalRepWorkerLock);
     613             : 
     614             :         /* Wait a bit --- we don't expect to have to wait long. */
     615         232 :         rc = WaitLatch(MyLatch,
     616             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     617             :                        10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
     618             : 
     619         232 :         if (rc & WL_LATCH_SET)
     620             :         {
     621          96 :             ResetLatch(MyLatch);
     622          96 :             CHECK_FOR_INTERRUPTS();
     623             :         }
     624             : 
     625         232 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     626             :     }
     627             : }
     628             : 
     629             : /*
     630             :  * Stop the logical replication worker for subid/relid, if any.
     631             :  */
     632             : void
     633         192 : logicalrep_worker_stop(Oid subid, Oid relid)
     634             : {
     635             :     LogicalRepWorker *worker;
     636             : 
     637         192 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     638             : 
     639         192 :     worker = logicalrep_worker_find(subid, relid, false);
     640             : 
     641         192 :     if (worker)
     642             :     {
     643             :         Assert(!isParallelApplyWorker(worker));
     644         148 :         logicalrep_worker_stop_internal(worker, SIGTERM);
     645             :     }
     646             : 
     647         192 :     LWLockRelease(LogicalRepWorkerLock);
     648         192 : }
     649             : 
     650             : /*
     651             :  * Stop the given logical replication parallel apply worker.
     652             :  *
     653             :  * Node that the function sends SIGUSR2 instead of SIGTERM to the parallel apply
     654             :  * worker so that the worker exits cleanly.
     655             :  */
     656             : void
     657          10 : logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
     658             : {
     659             :     int         slot_no;
     660             :     uint16      generation;
     661             :     LogicalRepWorker *worker;
     662             : 
     663          10 :     SpinLockAcquire(&winfo->shared->mutex);
     664          10 :     generation = winfo->shared->logicalrep_worker_generation;
     665          10 :     slot_no = winfo->shared->logicalrep_worker_slot_no;
     666          10 :     SpinLockRelease(&winfo->shared->mutex);
     667             : 
     668             :     Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
     669             : 
     670             :     /*
     671             :      * Detach from the error_mq_handle for the parallel apply worker before
     672             :      * stopping it. This prevents the leader apply worker from trying to
     673             :      * receive the message from the error queue that might already be detached
     674             :      * by the parallel apply worker.
     675             :      */
     676          10 :     if (winfo->error_mq_handle)
     677             :     {
     678          10 :         shm_mq_detach(winfo->error_mq_handle);
     679          10 :         winfo->error_mq_handle = NULL;
     680             :     }
     681             : 
     682          10 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     683             : 
     684          10 :     worker = &LogicalRepCtx->workers[slot_no];
     685             :     Assert(isParallelApplyWorker(worker));
     686             : 
     687             :     /*
     688             :      * Only stop the worker if the generation matches and the worker is alive.
     689             :      */
     690          10 :     if (worker->generation == generation && worker->proc)
     691          10 :         logicalrep_worker_stop_internal(worker, SIGUSR2);
     692             : 
     693          10 :     LWLockRelease(LogicalRepWorkerLock);
     694          10 : }
     695             : 
     696             : /*
     697             :  * Wake up (using latch) any logical replication worker for specified sub/rel.
     698             :  */
     699             : void
     700         422 : logicalrep_worker_wakeup(Oid subid, Oid relid)
     701             : {
     702             :     LogicalRepWorker *worker;
     703             : 
     704         422 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     705             : 
     706         422 :     worker = logicalrep_worker_find(subid, relid, true);
     707             : 
     708         422 :     if (worker)
     709         422 :         logicalrep_worker_wakeup_ptr(worker);
     710             : 
     711         422 :     LWLockRelease(LogicalRepWorkerLock);
     712         422 : }
     713             : 
     714             : /*
     715             :  * Wake up (using latch) the specified logical replication worker.
     716             :  *
     717             :  * Caller must hold lock, else worker->proc could change under us.
     718             :  */
     719             : void
     720        1286 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
     721             : {
     722             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     723             : 
     724        1286 :     SetLatch(&worker->proc->procLatch);
     725        1286 : }
     726             : 
     727             : /*
     728             :  * Attach to a slot.
     729             :  */
     730             : void
     731        1026 : logicalrep_worker_attach(int slot)
     732             : {
     733             :     /* Block concurrent access. */
     734        1026 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     735             : 
     736             :     Assert(slot >= 0 && slot < max_logical_replication_workers);
     737        1026 :     MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
     738             : 
     739        1026 :     if (!MyLogicalRepWorker->in_use)
     740             :     {
     741           0 :         LWLockRelease(LogicalRepWorkerLock);
     742           0 :         ereport(ERROR,
     743             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     744             :                  errmsg("logical replication worker slot %d is empty, cannot attach",
     745             :                         slot)));
     746             :     }
     747             : 
     748        1026 :     if (MyLogicalRepWorker->proc)
     749             :     {
     750           0 :         LWLockRelease(LogicalRepWorkerLock);
     751           0 :         ereport(ERROR,
     752             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     753             :                  errmsg("logical replication worker slot %d is already used by "
     754             :                         "another worker, cannot attach", slot)));
     755             :     }
     756             : 
     757        1026 :     MyLogicalRepWorker->proc = MyProc;
     758        1026 :     before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
     759             : 
     760        1026 :     LWLockRelease(LogicalRepWorkerLock);
     761        1026 : }
     762             : 
     763             : /*
     764             :  * Stop the parallel apply workers if any, and detach the leader apply worker
     765             :  * (cleans up the worker info).
     766             :  */
     767             : static void
     768        1026 : logicalrep_worker_detach(void)
     769             : {
     770             :     /* Stop the parallel apply workers. */
     771        1026 :     if (am_leader_apply_worker())
     772             :     {
     773             :         List       *workers;
     774             :         ListCell   *lc;
     775             : 
     776             :         /*
     777             :          * Detach from the error_mq_handle for all parallel apply workers
     778             :          * before terminating them. This prevents the leader apply worker from
     779             :          * receiving the worker termination message and sending it to logs
     780             :          * when the same is already done by the parallel worker.
     781             :          */
     782         614 :         pa_detach_all_error_mq();
     783             : 
     784         614 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     785             : 
     786         614 :         workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
     787        1236 :         foreach(lc, workers)
     788             :         {
     789         622 :             LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
     790             : 
     791         622 :             if (isParallelApplyWorker(w))
     792           6 :                 logicalrep_worker_stop_internal(w, SIGTERM);
     793             :         }
     794             : 
     795         614 :         LWLockRelease(LogicalRepWorkerLock);
     796             : 
     797         614 :         list_free(workers);
     798             :     }
     799             : 
     800             :     /* Block concurrent access. */
     801        1026 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     802             : 
     803        1026 :     logicalrep_worker_cleanup(MyLogicalRepWorker);
     804             : 
     805        1026 :     LWLockRelease(LogicalRepWorkerLock);
     806        1026 : }
     807             : 
     808             : /*
     809             :  * Clean up worker info.
     810             :  */
     811             : static void
     812        1026 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
     813             : {
     814             :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
     815             : 
     816        1026 :     worker->type = WORKERTYPE_UNKNOWN;
     817        1026 :     worker->in_use = false;
     818        1026 :     worker->proc = NULL;
     819        1026 :     worker->dbid = InvalidOid;
     820        1026 :     worker->userid = InvalidOid;
     821        1026 :     worker->subid = InvalidOid;
     822        1026 :     worker->relid = InvalidOid;
     823        1026 :     worker->leader_pid = InvalidPid;
     824        1026 :     worker->parallel_apply = false;
     825        1026 : }
     826             : 
     827             : /*
     828             :  * Cleanup function for logical replication launcher.
     829             :  *
     830             :  * Called on logical replication launcher exit.
     831             :  */
     832             : static void
     833         852 : logicalrep_launcher_onexit(int code, Datum arg)
     834             : {
     835         852 :     LogicalRepCtx->launcher_pid = 0;
     836         852 : }
     837             : 
     838             : /*
     839             :  * Cleanup function.
     840             :  *
     841             :  * Called on logical replication worker exit.
     842             :  */
     843             : static void
     844        1026 : logicalrep_worker_onexit(int code, Datum arg)
     845             : {
     846             :     /* Disconnect gracefully from the remote side. */
     847        1026 :     if (LogRepWorkerWalRcvConn)
     848         828 :         walrcv_disconnect(LogRepWorkerWalRcvConn);
     849             : 
     850        1026 :     logicalrep_worker_detach();
     851             : 
     852             :     /* Cleanup fileset used for streaming transactions. */
     853        1026 :     if (MyLogicalRepWorker->stream_fileset != NULL)
     854          28 :         FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
     855             : 
     856             :     /*
     857             :      * Session level locks may be acquired outside of a transaction in
     858             :      * parallel apply mode and will not be released when the worker
     859             :      * terminates, so manually release all locks before the worker exits.
     860             :      *
     861             :      * The locks will be acquired once the worker is initialized.
     862             :      */
     863        1026 :     if (!InitializingApplyWorker)
     864         916 :         LockReleaseAll(DEFAULT_LOCKMETHOD, true);
     865             : 
     866        1026 :     ApplyLauncherWakeup();
     867        1026 : }
     868             : 
     869             : /*
     870             :  * Count the number of registered (not necessarily running) sync workers
     871             :  * for a subscription.
     872             :  */
     873             : int
     874        2426 : logicalrep_sync_worker_count(Oid subid)
     875             : {
     876             :     int         i;
     877        2426 :     int         res = 0;
     878             : 
     879             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     880             : 
     881             :     /* Search for attached worker for a given subscription id. */
     882       12510 :     for (i = 0; i < max_logical_replication_workers; i++)
     883             :     {
     884       10084 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     885             : 
     886       10084 :         if (isTablesyncWorker(w) && w->subid == subid)
     887        2692 :             res++;
     888             :     }
     889             : 
     890        2426 :     return res;
     891             : }
     892             : 
     893             : /*
     894             :  * Count the number of registered (but not necessarily running) parallel apply
     895             :  * workers for a subscription.
     896             :  */
     897             : static int
     898         810 : logicalrep_pa_worker_count(Oid subid)
     899             : {
     900             :     int         i;
     901         810 :     int         res = 0;
     902             : 
     903             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     904             : 
     905             :     /*
     906             :      * Scan all attached parallel apply workers, only counting those which
     907             :      * have the given subscription id.
     908             :      */
     909        4282 :     for (i = 0; i < max_logical_replication_workers; i++)
     910             :     {
     911        3472 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     912             : 
     913        3472 :         if (isParallelApplyWorker(w) && w->subid == subid)
     914           4 :             res++;
     915             :     }
     916             : 
     917         810 :     return res;
     918             : }
     919             : 
     920             : /*
     921             :  * ApplyLauncherShmemSize
     922             :  *      Compute space needed for replication launcher shared memory
     923             :  */
     924             : Size
     925        8420 : ApplyLauncherShmemSize(void)
     926             : {
     927             :     Size        size;
     928             : 
     929             :     /*
     930             :      * Need the fixed struct and the array of LogicalRepWorker.
     931             :      */
     932        8420 :     size = sizeof(LogicalRepCtxStruct);
     933        8420 :     size = MAXALIGN(size);
     934        8420 :     size = add_size(size, mul_size(max_logical_replication_workers,
     935             :                                    sizeof(LogicalRepWorker)));
     936        8420 :     return size;
     937             : }
     938             : 
     939             : /*
     940             :  * ApplyLauncherRegister
     941             :  *      Register a background worker running the logical replication launcher.
     942             :  */
     943             : void
     944        1746 : ApplyLauncherRegister(void)
     945             : {
     946             :     BackgroundWorker bgw;
     947             : 
     948             :     /*
     949             :      * The logical replication launcher is disabled during binary upgrades, to
     950             :      * prevent logical replication workers from running on the source cluster.
     951             :      * That could cause replication origins to move forward after having been
     952             :      * copied to the target cluster, potentially creating conflicts with the
     953             :      * copied data files.
     954             :      */
     955        1746 :     if (max_logical_replication_workers == 0 || IsBinaryUpgrade)
     956         106 :         return;
     957             : 
     958        1640 :     memset(&bgw, 0, sizeof(bgw));
     959        1640 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
     960             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     961        1640 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
     962        1640 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
     963        1640 :     snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
     964        1640 :     snprintf(bgw.bgw_name, BGW_MAXLEN,
     965             :              "logical replication launcher");
     966        1640 :     snprintf(bgw.bgw_type, BGW_MAXLEN,
     967             :              "logical replication launcher");
     968        1640 :     bgw.bgw_restart_time = 5;
     969        1640 :     bgw.bgw_notify_pid = 0;
     970        1640 :     bgw.bgw_main_arg = (Datum) 0;
     971             : 
     972        1640 :     RegisterBackgroundWorker(&bgw);
     973             : }
     974             : 
     975             : /*
     976             :  * ApplyLauncherShmemInit
     977             :  *      Allocate and initialize replication launcher shared memory
     978             :  */
     979             : void
     980        2180 : ApplyLauncherShmemInit(void)
     981             : {
     982             :     bool        found;
     983             : 
     984        2180 :     LogicalRepCtx = (LogicalRepCtxStruct *)
     985        2180 :         ShmemInitStruct("Logical Replication Launcher Data",
     986             :                         ApplyLauncherShmemSize(),
     987             :                         &found);
     988             : 
     989        2180 :     if (!found)
     990             :     {
     991             :         int         slot;
     992             : 
     993        2180 :         memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
     994             : 
     995        2180 :         LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
     996        2180 :         LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
     997             : 
     998             :         /* Initialize memory and spin locks for each worker slot. */
     999       10826 :         for (slot = 0; slot < max_logical_replication_workers; slot++)
    1000             :         {
    1001        8646 :             LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
    1002             : 
    1003        8646 :             memset(worker, 0, sizeof(LogicalRepWorker));
    1004        8646 :             SpinLockInit(&worker->relmutex);
    1005             :         }
    1006             :     }
    1007        2180 : }
    1008             : 
    1009             : /*
    1010             :  * Initialize or attach to the dynamic shared hash table that stores the
    1011             :  * last-start times, if not already done.
    1012             :  * This must be called before accessing the table.
    1013             :  */
    1014             : static void
    1015        1488 : logicalrep_launcher_attach_dshmem(void)
    1016             : {
    1017             :     MemoryContext oldcontext;
    1018             : 
    1019             :     /* Quick exit if we already did this. */
    1020        1488 :     if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
    1021        1378 :         last_start_times != NULL)
    1022        1014 :         return;
    1023             : 
    1024             :     /* Otherwise, use a lock to ensure only one process creates the table. */
    1025         474 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
    1026             : 
    1027             :     /* Be sure any local memory allocated by DSA routines is persistent. */
    1028         474 :     oldcontext = MemoryContextSwitchTo(TopMemoryContext);
    1029             : 
    1030         474 :     if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
    1031             :     {
    1032             :         /* Initialize dynamic shared hash table for last-start times. */
    1033         110 :         last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
    1034         110 :         dsa_pin(last_start_times_dsa);
    1035         110 :         dsa_pin_mapping(last_start_times_dsa);
    1036         110 :         last_start_times = dshash_create(last_start_times_dsa, &dsh_params, NULL);
    1037             : 
    1038             :         /* Store handles in shared memory for other backends to use. */
    1039         110 :         LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
    1040         110 :         LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
    1041             :     }
    1042         364 :     else if (!last_start_times)
    1043             :     {
    1044             :         /* Attach to existing dynamic shared hash table. */
    1045         364 :         last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
    1046         364 :         dsa_pin_mapping(last_start_times_dsa);
    1047         364 :         last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
    1048         364 :                                          LogicalRepCtx->last_start_dsh, NULL);
    1049             :     }
    1050             : 
    1051         474 :     MemoryContextSwitchTo(oldcontext);
    1052         474 :     LWLockRelease(LogicalRepWorkerLock);
    1053             : }
    1054             : 
    1055             : /*
    1056             :  * Set the last-start time for the subscription.
    1057             :  */
    1058             : static void
    1059         402 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
    1060             : {
    1061             :     LauncherLastStartTimesEntry *entry;
    1062             :     bool        found;
    1063             : 
    1064         402 :     logicalrep_launcher_attach_dshmem();
    1065             : 
    1066         402 :     entry = dshash_find_or_insert(last_start_times, &subid, &found);
    1067         402 :     entry->last_start_time = start_time;
    1068         402 :     dshash_release_lock(last_start_times, entry);
    1069         402 : }
    1070             : 
    1071             : /*
    1072             :  * Return the last-start time for the subscription, or 0 if there isn't one.
    1073             :  */
    1074             : static TimestampTz
    1075         644 : ApplyLauncherGetWorkerStartTime(Oid subid)
    1076             : {
    1077             :     LauncherLastStartTimesEntry *entry;
    1078             :     TimestampTz ret;
    1079             : 
    1080         644 :     logicalrep_launcher_attach_dshmem();
    1081             : 
    1082         644 :     entry = dshash_find(last_start_times, &subid, false);
    1083         644 :     if (entry == NULL)
    1084         284 :         return 0;
    1085             : 
    1086         360 :     ret = entry->last_start_time;
    1087         360 :     dshash_release_lock(last_start_times, entry);
    1088             : 
    1089         360 :     return ret;
    1090             : }
    1091             : 
    1092             : /*
    1093             :  * Remove the last-start-time entry for the subscription, if one exists.
    1094             :  *
    1095             :  * This has two use-cases: to remove the entry related to a subscription
    1096             :  * that's been deleted or disabled (just to avoid leaking shared memory),
    1097             :  * and to allow immediate restart of an apply worker that has exited
    1098             :  * due to subscription parameter changes.
    1099             :  */
    1100             : void
    1101         442 : ApplyLauncherForgetWorkerStartTime(Oid subid)
    1102             : {
    1103         442 :     logicalrep_launcher_attach_dshmem();
    1104             : 
    1105         442 :     (void) dshash_delete_key(last_start_times, &subid);
    1106         442 : }
    1107             : 
    1108             : /*
    1109             :  * Wakeup the launcher on commit if requested.
    1110             :  */
    1111             : void
    1112     1100002 : AtEOXact_ApplyLauncher(bool isCommit)
    1113             : {
    1114     1100002 :     if (isCommit)
    1115             :     {
    1116     1049268 :         if (on_commit_launcher_wakeup)
    1117         286 :             ApplyLauncherWakeup();
    1118             :     }
    1119             : 
    1120     1100002 :     on_commit_launcher_wakeup = false;
    1121     1100002 : }
    1122             : 
    1123             : /*
    1124             :  * Request wakeup of the launcher on commit of the transaction.
    1125             :  *
    1126             :  * This is used to send launcher signal to stop sleeping and process the
    1127             :  * subscriptions when current transaction commits. Should be used when new
    1128             :  * tuple was added to the pg_subscription catalog.
    1129             : */
    1130             : void
    1131         288 : ApplyLauncherWakeupAtCommit(void)
    1132             : {
    1133         288 :     if (!on_commit_launcher_wakeup)
    1134         286 :         on_commit_launcher_wakeup = true;
    1135         288 : }
    1136             : 
    1137             : /*
    1138             :  * Wakeup the launcher immediately.
    1139             :  */
    1140             : void
    1141        1392 : ApplyLauncherWakeup(void)
    1142             : {
    1143        1392 :     if (LogicalRepCtx->launcher_pid != 0)
    1144        1362 :         kill(LogicalRepCtx->launcher_pid, SIGUSR1);
    1145        1392 : }
    1146             : 
    1147             : /*
    1148             :  * Main loop for the apply launcher process.
    1149             :  */
    1150             : void
    1151         852 : ApplyLauncherMain(Datum main_arg)
    1152             : {
    1153         852 :     ereport(DEBUG1,
    1154             :             (errmsg_internal("logical replication launcher started")));
    1155             : 
    1156         852 :     before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
    1157             : 
    1158             :     Assert(LogicalRepCtx->launcher_pid == 0);
    1159         852 :     LogicalRepCtx->launcher_pid = MyProcPid;
    1160             : 
    1161             :     /* Establish signal handlers. */
    1162         852 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
    1163         852 :     pqsignal(SIGTERM, die);
    1164         852 :     BackgroundWorkerUnblockSignals();
    1165             : 
    1166             :     /*
    1167             :      * Establish connection to nailed catalogs (we only ever access
    1168             :      * pg_subscription).
    1169             :      */
    1170         852 :     BackgroundWorkerInitializeConnection(NULL, NULL, 0);
    1171             : 
    1172             :     /*
    1173             :      * Acquire the conflict detection slot at startup to ensure it can be
    1174             :      * dropped if no longer needed after a restart.
    1175             :      */
    1176         852 :     acquire_conflict_slot_if_exists();
    1177             : 
    1178             :     /* Enter main loop */
    1179             :     for (;;)
    1180        4896 :     {
    1181             :         int         rc;
    1182             :         List       *sublist;
    1183             :         ListCell   *lc;
    1184             :         MemoryContext subctx;
    1185             :         MemoryContext oldctx;
    1186        5748 :         long        wait_time = DEFAULT_NAPTIME_PER_CYCLE;
    1187        5748 :         bool        can_update_xmin = true;
    1188        5748 :         bool        retain_dead_tuples = false;
    1189        5748 :         TransactionId xmin = InvalidTransactionId;
    1190             : 
    1191        5748 :         CHECK_FOR_INTERRUPTS();
    1192             : 
    1193             :         /* Use temporary context to avoid leaking memory across cycles. */
    1194        5742 :         subctx = AllocSetContextCreate(TopMemoryContext,
    1195             :                                        "Logical Replication Launcher sublist",
    1196             :                                        ALLOCSET_DEFAULT_SIZES);
    1197        5742 :         oldctx = MemoryContextSwitchTo(subctx);
    1198             : 
    1199             :         /*
    1200             :          * Start any missing workers for enabled subscriptions.
    1201             :          *
    1202             :          * Also, during the iteration through all subscriptions, we compute
    1203             :          * the minimum XID required to protect deleted tuples for conflict
    1204             :          * detection if one of the subscription enables retain_dead_tuples
    1205             :          * option.
    1206             :          */
    1207        5742 :         sublist = get_subscription_list();
    1208        7468 :         foreach(lc, sublist)
    1209             :         {
    1210        1726 :             Subscription *sub = (Subscription *) lfirst(lc);
    1211             :             LogicalRepWorker *w;
    1212             :             TimestampTz last_start;
    1213             :             TimestampTz now;
    1214             :             long        elapsed;
    1215             : 
    1216        1726 :             if (sub->retaindeadtuples)
    1217             :             {
    1218         108 :                 retain_dead_tuples = true;
    1219             : 
    1220             :                 /*
    1221             :                  * Create a replication slot to retain information necessary
    1222             :                  * for conflict detection such as dead tuples, commit
    1223             :                  * timestamps, and origins.
    1224             :                  *
    1225             :                  * The slot is created before starting the apply worker to
    1226             :                  * prevent it from unnecessarily maintaining its
    1227             :                  * oldest_nonremovable_xid.
    1228             :                  *
    1229             :                  * The slot is created even for a disabled subscription to
    1230             :                  * ensure that conflict-related information is available when
    1231             :                  * applying remote changes that occurred before the
    1232             :                  * subscription was enabled.
    1233             :                  */
    1234         108 :                 CreateConflictDetectionSlot();
    1235             : 
    1236         108 :                 if (sub->retentionactive)
    1237             :                 {
    1238             :                     /*
    1239             :                      * Can't advance xmin of the slot unless all the
    1240             :                      * subscriptions actively retaining dead tuples are
    1241             :                      * enabled. This is required to ensure that we don't
    1242             :                      * advance the xmin of CONFLICT_DETECTION_SLOT if one of
    1243             :                      * the subscriptions is not enabled. Otherwise, we won't
    1244             :                      * be able to detect conflicts reliably for such a
    1245             :                      * subscription even though it has set the
    1246             :                      * retain_dead_tuples option.
    1247             :                      */
    1248         108 :                     can_update_xmin &= sub->enabled;
    1249             : 
    1250             :                     /*
    1251             :                      * Initialize the slot once the subscription activiates
    1252             :                      * retention.
    1253             :                      */
    1254         108 :                     if (!TransactionIdIsValid(MyReplicationSlot->data.xmin))
    1255           0 :                         init_conflict_slot_xmin();
    1256             :                 }
    1257             :             }
    1258             : 
    1259        1726 :             if (!sub->enabled)
    1260          74 :                 continue;
    1261             : 
    1262        1652 :             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1263        1652 :             w = logicalrep_worker_find(sub->oid, InvalidOid, false);
    1264             : 
    1265        1652 :             if (w != NULL)
    1266             :             {
    1267             :                 /*
    1268             :                  * Compute the minimum xmin required to protect dead tuples
    1269             :                  * required for conflict detection among all running apply
    1270             :                  * workers. This computation is performed while holding
    1271             :                  * LogicalRepWorkerLock to prevent accessing invalid worker
    1272             :                  * data, in scenarios where a worker might exit and reset its
    1273             :                  * state concurrently.
    1274             :                  */
    1275        1008 :                 if (sub->retaindeadtuples &&
    1276         100 :                     sub->retentionactive &&
    1277             :                     can_update_xmin)
    1278         100 :                     compute_min_nonremovable_xid(w, &xmin);
    1279             : 
    1280        1008 :                 LWLockRelease(LogicalRepWorkerLock);
    1281             : 
    1282             :                 /* worker is running already */
    1283        1008 :                 continue;
    1284             :             }
    1285             : 
    1286         644 :             LWLockRelease(LogicalRepWorkerLock);
    1287             : 
    1288             :             /*
    1289             :              * Can't advance xmin of the slot unless all the workers
    1290             :              * corresponding to subscriptions actively retaining dead tuples
    1291             :              * are running, disabling the further computation of the minimum
    1292             :              * nonremovable xid.
    1293             :              */
    1294         644 :             if (sub->retaindeadtuples && sub->retentionactive)
    1295           4 :                 can_update_xmin = false;
    1296             : 
    1297             :             /*
    1298             :              * If the worker is eligible to start now, launch it.  Otherwise,
    1299             :              * adjust wait_time so that we'll wake up as soon as it can be
    1300             :              * started.
    1301             :              *
    1302             :              * Each subscription's apply worker can only be restarted once per
    1303             :              * wal_retrieve_retry_interval, so that errors do not cause us to
    1304             :              * repeatedly restart the worker as fast as possible.  In cases
    1305             :              * where a restart is expected (e.g., subscription parameter
    1306             :              * changes), another process should remove the last-start entry
    1307             :              * for the subscription so that the worker can be restarted
    1308             :              * without waiting for wal_retrieve_retry_interval to elapse.
    1309             :              */
    1310         644 :             last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
    1311         644 :             now = GetCurrentTimestamp();
    1312         644 :             if (last_start == 0 ||
    1313         360 :                 (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
    1314             :             {
    1315         402 :                 ApplyLauncherSetWorkerStartTime(sub->oid, now);
    1316         402 :                 if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
    1317         402 :                                               sub->dbid, sub->oid, sub->name,
    1318             :                                               sub->owner, InvalidOid,
    1319             :                                               DSM_HANDLE_INVALID,
    1320         406 :                                               sub->retaindeadtuples &&
    1321           4 :                                               sub->retentionactive))
    1322             :                 {
    1323             :                     /*
    1324             :                      * We get here either if we failed to launch a worker
    1325             :                      * (perhaps for resource-exhaustion reasons) or if we
    1326             :                      * launched one but it immediately quit.  Either way, it
    1327             :                      * seems appropriate to try again after
    1328             :                      * wal_retrieve_retry_interval.
    1329             :                      */
    1330          12 :                     wait_time = Min(wait_time,
    1331             :                                     wal_retrieve_retry_interval);
    1332             :                 }
    1333             :             }
    1334             :             else
    1335             :             {
    1336         242 :                 wait_time = Min(wait_time,
    1337             :                                 wal_retrieve_retry_interval - elapsed);
    1338             :             }
    1339             :         }
    1340             : 
    1341             :         /*
    1342             :          * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription
    1343             :          * that requires us to retain dead tuples. Otherwise, if required,
    1344             :          * advance the slot's xmin to protect dead tuples required for the
    1345             :          * conflict detection.
    1346             :          *
    1347             :          * Additionally, if all apply workers for subscriptions with
    1348             :          * retain_dead_tuples enabled have requested to stop retention, the
    1349             :          * slot's xmin will be set to InvalidTransactionId allowing the
    1350             :          * removal of dead tuples.
    1351             :          */
    1352        5742 :         if (MyReplicationSlot)
    1353             :         {
    1354         110 :             if (!retain_dead_tuples)
    1355           2 :                 ReplicationSlotDropAcquired();
    1356         108 :             else if (can_update_xmin)
    1357         100 :                 update_conflict_slot_xmin(xmin);
    1358             :         }
    1359             : 
    1360             :         /* Switch back to original memory context. */
    1361        5742 :         MemoryContextSwitchTo(oldctx);
    1362             :         /* Clean the temporary memory. */
    1363        5742 :         MemoryContextDelete(subctx);
    1364             : 
    1365             :         /* Wait for more work. */
    1366        5742 :         rc = WaitLatch(MyLatch,
    1367             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1368             :                        wait_time,
    1369             :                        WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
    1370             : 
    1371        5736 :         if (rc & WL_LATCH_SET)
    1372             :         {
    1373        5684 :             ResetLatch(MyLatch);
    1374        5684 :             CHECK_FOR_INTERRUPTS();
    1375             :         }
    1376             : 
    1377        4896 :         if (ConfigReloadPending)
    1378             :         {
    1379          72 :             ConfigReloadPending = false;
    1380          72 :             ProcessConfigFile(PGC_SIGHUP);
    1381             :         }
    1382             :     }
    1383             : 
    1384             :     /* Not reachable */
    1385             : }
    1386             : 
    1387             : /*
    1388             :  * Determine the minimum non-removable transaction ID across all apply workers
    1389             :  * for subscriptions that have retain_dead_tuples enabled. Store the result
    1390             :  * in *xmin.
    1391             :  */
    1392             : static void
    1393         100 : compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
    1394             : {
    1395             :     TransactionId nonremovable_xid;
    1396             : 
    1397             :     Assert(worker != NULL);
    1398             : 
    1399             :     /*
    1400             :      * The replication slot for conflict detection must be created before the
    1401             :      * worker starts.
    1402             :      */
    1403             :     Assert(MyReplicationSlot);
    1404             : 
    1405         100 :     SpinLockAcquire(&worker->relmutex);
    1406         100 :     nonremovable_xid = worker->oldest_nonremovable_xid;
    1407         100 :     SpinLockRelease(&worker->relmutex);
    1408             : 
    1409             :     /*
    1410             :      * Return if the apply worker has stopped retention concurrently.
    1411             :      *
    1412             :      * Although this function is invoked only when retentionactive is true,
    1413             :      * the apply worker might stop retention after the launcher fetches the
    1414             :      * retentionactive flag.
    1415             :      */
    1416         100 :     if (!TransactionIdIsValid(nonremovable_xid))
    1417           0 :         return;
    1418             : 
    1419         100 :     if (!TransactionIdIsValid(*xmin) ||
    1420           0 :         TransactionIdPrecedes(nonremovable_xid, *xmin))
    1421         100 :         *xmin = nonremovable_xid;
    1422             : }
    1423             : 
    1424             : /*
    1425             :  * Acquire the replication slot used to retain information for conflict
    1426             :  * detection, if it exists.
    1427             :  *
    1428             :  * Return true if successfully acquired, otherwise return false.
    1429             :  */
    1430             : static bool
    1431         852 : acquire_conflict_slot_if_exists(void)
    1432             : {
    1433         852 :     if (!SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true))
    1434         850 :         return false;
    1435             : 
    1436           2 :     ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false);
    1437           2 :     return true;
    1438             : }
    1439             : 
    1440             : /*
    1441             :  * Update the xmin the replication slot used to retain information required
    1442             :  * for conflict detection.
    1443             :  */
    1444             : static void
    1445         100 : update_conflict_slot_xmin(TransactionId new_xmin)
    1446             : {
    1447             :     Assert(MyReplicationSlot);
    1448             :     Assert(!TransactionIdIsValid(new_xmin) ||
    1449             :            TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
    1450             : 
    1451             :     /* Return if the xmin value of the slot cannot be updated */
    1452         100 :     if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin))
    1453          70 :         return;
    1454             : 
    1455          30 :     SpinLockAcquire(&MyReplicationSlot->mutex);
    1456          30 :     MyReplicationSlot->effective_xmin = new_xmin;
    1457          30 :     MyReplicationSlot->data.xmin = new_xmin;
    1458          30 :     SpinLockRelease(&MyReplicationSlot->mutex);
    1459             : 
    1460          30 :     elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin);
    1461             : 
    1462          30 :     ReplicationSlotMarkDirty();
    1463          30 :     ReplicationSlotsComputeRequiredXmin(false);
    1464             : 
    1465             :     /*
    1466             :      * Like PhysicalConfirmReceivedLocation(), do not save slot information
    1467             :      * each time. This is acceptable because all concurrent transactions on
    1468             :      * the publisher that require the data preceding the slot's xmin should
    1469             :      * have already been applied and flushed on the subscriber before the xmin
    1470             :      * is advanced. So, even if the slot's xmin regresses after a restart, it
    1471             :      * will be advanced again in the next cycle. Therefore, no data required
    1472             :      * for conflict detection will be prematurely removed.
    1473             :      */
    1474          30 :     return;
    1475             : }
    1476             : 
    1477             : /*
    1478             :  * Initialize the xmin for the conflict detection slot.
    1479             :  */
    1480             : static void
    1481           8 : init_conflict_slot_xmin(void)
    1482             : {
    1483             :     TransactionId xmin_horizon;
    1484             : 
    1485             :     /* Replication slot must exist but shouldn't be initialized. */
    1486             :     Assert(MyReplicationSlot &&
    1487             :            !TransactionIdIsValid(MyReplicationSlot->data.xmin));
    1488             : 
    1489           8 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
    1490             : 
    1491           8 :     xmin_horizon = GetOldestSafeDecodingTransactionId(false);
    1492             : 
    1493           8 :     SpinLockAcquire(&MyReplicationSlot->mutex);
    1494           8 :     MyReplicationSlot->effective_xmin = xmin_horizon;
    1495           8 :     MyReplicationSlot->data.xmin = xmin_horizon;
    1496           8 :     SpinLockRelease(&MyReplicationSlot->mutex);
    1497             : 
    1498           8 :     ReplicationSlotsComputeRequiredXmin(true);
    1499             : 
    1500           8 :     LWLockRelease(ProcArrayLock);
    1501             : 
    1502             :     /* Write this slot to disk */
    1503           8 :     ReplicationSlotMarkDirty();
    1504           8 :     ReplicationSlotSave();
    1505           8 : }
    1506             : 
    1507             : /*
    1508             :  * Create and acquire the replication slot used to retain information for
    1509             :  * conflict detection, if not yet.
    1510             :  */
    1511             : void
    1512         110 : CreateConflictDetectionSlot(void)
    1513             : {
    1514             :     /* Exit early, if the replication slot is already created and acquired */
    1515         110 :     if (MyReplicationSlot)
    1516         102 :         return;
    1517             : 
    1518           8 :     ereport(LOG,
    1519             :             errmsg("creating replication conflict detection slot"));
    1520             : 
    1521           8 :     ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
    1522             :                           false, false);
    1523             : 
    1524           8 :     init_conflict_slot_xmin();
    1525             : }
    1526             : 
    1527             : /*
    1528             :  * Is current process the logical replication launcher?
    1529             :  */
    1530             : bool
    1531        4896 : IsLogicalLauncher(void)
    1532             : {
    1533        4896 :     return LogicalRepCtx->launcher_pid == MyProcPid;
    1534             : }
    1535             : 
    1536             : /*
    1537             :  * Return the pid of the leader apply worker if the given pid is the pid of a
    1538             :  * parallel apply worker, otherwise, return InvalidPid.
    1539             :  */
    1540             : pid_t
    1541        1896 : GetLeaderApplyWorkerPid(pid_t pid)
    1542             : {
    1543        1896 :     int         leader_pid = InvalidPid;
    1544             :     int         i;
    1545             : 
    1546        1896 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1547             : 
    1548        9480 :     for (i = 0; i < max_logical_replication_workers; i++)
    1549             :     {
    1550        7584 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
    1551             : 
    1552        7584 :         if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
    1553             :         {
    1554           0 :             leader_pid = w->leader_pid;
    1555           0 :             break;
    1556             :         }
    1557             :     }
    1558             : 
    1559        1896 :     LWLockRelease(LogicalRepWorkerLock);
    1560             : 
    1561        1896 :     return leader_pid;
    1562             : }
    1563             : 
    1564             : /*
    1565             :  * Returns state of the subscriptions.
    1566             :  */
    1567             : Datum
    1568           2 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
    1569             : {
    1570             : #define PG_STAT_GET_SUBSCRIPTION_COLS   10
    1571           2 :     Oid         subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
    1572             :     int         i;
    1573           2 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1574             : 
    1575           2 :     InitMaterializedSRF(fcinfo, 0);
    1576             : 
    1577             :     /* Make sure we get consistent view of the workers. */
    1578           2 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1579             : 
    1580          10 :     for (i = 0; i < max_logical_replication_workers; i++)
    1581             :     {
    1582             :         /* for each row */
    1583           8 :         Datum       values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
    1584           8 :         bool        nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
    1585             :         int         worker_pid;
    1586             :         LogicalRepWorker worker;
    1587             : 
    1588           8 :         memcpy(&worker, &LogicalRepCtx->workers[i],
    1589             :                sizeof(LogicalRepWorker));
    1590           8 :         if (!worker.proc || !IsBackendPid(worker.proc->pid))
    1591           4 :             continue;
    1592             : 
    1593           4 :         if (OidIsValid(subid) && worker.subid != subid)
    1594           0 :             continue;
    1595             : 
    1596           4 :         worker_pid = worker.proc->pid;
    1597             : 
    1598           4 :         values[0] = ObjectIdGetDatum(worker.subid);
    1599           4 :         if (isTablesyncWorker(&worker))
    1600           0 :             values[1] = ObjectIdGetDatum(worker.relid);
    1601             :         else
    1602           4 :             nulls[1] = true;
    1603           4 :         values[2] = Int32GetDatum(worker_pid);
    1604             : 
    1605           4 :         if (isParallelApplyWorker(&worker))
    1606           0 :             values[3] = Int32GetDatum(worker.leader_pid);
    1607             :         else
    1608           4 :             nulls[3] = true;
    1609             : 
    1610           4 :         if (XLogRecPtrIsInvalid(worker.last_lsn))
    1611           2 :             nulls[4] = true;
    1612             :         else
    1613           2 :             values[4] = LSNGetDatum(worker.last_lsn);
    1614           4 :         if (worker.last_send_time == 0)
    1615           0 :             nulls[5] = true;
    1616             :         else
    1617           4 :             values[5] = TimestampTzGetDatum(worker.last_send_time);
    1618           4 :         if (worker.last_recv_time == 0)
    1619           0 :             nulls[6] = true;
    1620             :         else
    1621           4 :             values[6] = TimestampTzGetDatum(worker.last_recv_time);
    1622           4 :         if (XLogRecPtrIsInvalid(worker.reply_lsn))
    1623           2 :             nulls[7] = true;
    1624             :         else
    1625           2 :             values[7] = LSNGetDatum(worker.reply_lsn);
    1626           4 :         if (worker.reply_time == 0)
    1627           0 :             nulls[8] = true;
    1628             :         else
    1629           4 :             values[8] = TimestampTzGetDatum(worker.reply_time);
    1630             : 
    1631           4 :         switch (worker.type)
    1632             :         {
    1633           4 :             case WORKERTYPE_APPLY:
    1634           4 :                 values[9] = CStringGetTextDatum("apply");
    1635           4 :                 break;
    1636           0 :             case WORKERTYPE_PARALLEL_APPLY:
    1637           0 :                 values[9] = CStringGetTextDatum("parallel apply");
    1638           0 :                 break;
    1639           0 :             case WORKERTYPE_TABLESYNC:
    1640           0 :                 values[9] = CStringGetTextDatum("table synchronization");
    1641           0 :                 break;
    1642           0 :             case WORKERTYPE_UNKNOWN:
    1643             :                 /* Should never happen. */
    1644           0 :                 elog(ERROR, "unknown worker type");
    1645             :         }
    1646             : 
    1647           4 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
    1648             :                              values, nulls);
    1649             : 
    1650             :         /*
    1651             :          * If only a single subscription was requested, and we found it,
    1652             :          * break.
    1653             :          */
    1654           4 :         if (OidIsValid(subid))
    1655           0 :             break;
    1656             :     }
    1657             : 
    1658           2 :     LWLockRelease(LogicalRepWorkerLock);
    1659             : 
    1660           2 :     return (Datum) 0;
    1661             : }

Generated by: LCOV version 1.16