LCOV - code coverage report
Current view: top level - src/backend/replication/logical - launcher.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 491 544 90.3 %
Date: 2025-09-10 21:18:40 Functions: 36 36 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * launcher.c
       3             :  *     PostgreSQL logical replication worker launcher process
       4             :  *
       5             :  * Copyright (c) 2016-2025, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/launcher.c
       9             :  *
      10             :  * NOTES
      11             :  *    This module contains the logical replication worker launcher which
      12             :  *    uses the background worker infrastructure to start the logical
      13             :  *    replication workers for every enabled subscription.
      14             :  *
      15             :  *-------------------------------------------------------------------------
      16             :  */
      17             : 
      18             : #include "postgres.h"
      19             : 
      20             : #include "access/heapam.h"
      21             : #include "access/htup.h"
      22             : #include "access/htup_details.h"
      23             : #include "access/tableam.h"
      24             : #include "access/xact.h"
      25             : #include "catalog/pg_subscription.h"
      26             : #include "catalog/pg_subscription_rel.h"
      27             : #include "funcapi.h"
      28             : #include "lib/dshash.h"
      29             : #include "miscadmin.h"
      30             : #include "pgstat.h"
      31             : #include "postmaster/bgworker.h"
      32             : #include "postmaster/interrupt.h"
      33             : #include "replication/logicallauncher.h"
      34             : #include "replication/origin.h"
      35             : #include "replication/slot.h"
      36             : #include "replication/walreceiver.h"
      37             : #include "replication/worker_internal.h"
      38             : #include "storage/ipc.h"
      39             : #include "storage/proc.h"
      40             : #include "storage/procarray.h"
      41             : #include "tcop/tcopprot.h"
      42             : #include "utils/builtins.h"
      43             : #include "utils/memutils.h"
      44             : #include "utils/pg_lsn.h"
      45             : #include "utils/snapmgr.h"
      46             : #include "utils/syscache.h"
      47             : 
      48             : /* max sleep time between cycles (3min) */
      49             : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
      50             : 
      51             : /* GUC variables */
      52             : int         max_logical_replication_workers = 4;
      53             : int         max_sync_workers_per_subscription = 2;
      54             : int         max_parallel_apply_workers_per_subscription = 2;
      55             : 
      56             : LogicalRepWorker *MyLogicalRepWorker = NULL;
      57             : 
      58             : typedef struct LogicalRepCtxStruct
      59             : {
      60             :     /* Supervisor process. */
      61             :     pid_t       launcher_pid;
      62             : 
      63             :     /* Hash table holding last start times of subscriptions' apply workers. */
      64             :     dsa_handle  last_start_dsa;
      65             :     dshash_table_handle last_start_dsh;
      66             : 
      67             :     /* Background workers. */
      68             :     LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
      69             : } LogicalRepCtxStruct;
      70             : 
      71             : static LogicalRepCtxStruct *LogicalRepCtx;
      72             : 
      73             : /* an entry in the last-start-times shared hash table */
      74             : typedef struct LauncherLastStartTimesEntry
      75             : {
      76             :     Oid         subid;          /* OID of logrep subscription (hash key) */
      77             :     TimestampTz last_start_time;    /* last time its apply worker was started */
      78             : } LauncherLastStartTimesEntry;
      79             : 
      80             : /* parameters for the last-start-times shared hash table */
      81             : static const dshash_parameters dsh_params = {
      82             :     sizeof(Oid),
      83             :     sizeof(LauncherLastStartTimesEntry),
      84             :     dshash_memcmp,
      85             :     dshash_memhash,
      86             :     dshash_memcpy,
      87             :     LWTRANCHE_LAUNCHER_HASH
      88             : };
      89             : 
      90             : static dsa_area *last_start_times_dsa = NULL;
      91             : static dshash_table *last_start_times = NULL;
      92             : 
      93             : static bool on_commit_launcher_wakeup = false;
      94             : 
      95             : 
      96             : static void logicalrep_launcher_onexit(int code, Datum arg);
      97             : static void logicalrep_worker_onexit(int code, Datum arg);
      98             : static void logicalrep_worker_detach(void);
      99             : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
     100             : static int  logicalrep_pa_worker_count(Oid subid);
     101             : static void logicalrep_launcher_attach_dshmem(void);
     102             : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
     103             : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
     104             : static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
     105             : static bool acquire_conflict_slot_if_exists(void);
     106             : static void update_conflict_slot_xmin(TransactionId new_xmin);
     107             : static void init_conflict_slot_xmin(void);
     108             : 
     109             : 
     110             : /*
     111             :  * Load the list of subscriptions.
     112             :  *
     113             :  * Only the fields interesting for worker start/stop functions are filled for
     114             :  * each subscription.
     115             :  */
     116             : static List *
     117        5696 : get_subscription_list(void)
     118             : {
     119        5696 :     List       *res = NIL;
     120             :     Relation    rel;
     121             :     TableScanDesc scan;
     122             :     HeapTuple   tup;
     123             :     MemoryContext resultcxt;
     124             : 
     125             :     /* This is the context that we will allocate our output data in */
     126        5696 :     resultcxt = CurrentMemoryContext;
     127             : 
     128             :     /*
     129             :      * Start a transaction so we can access pg_subscription.
     130             :      */
     131        5696 :     StartTransactionCommand();
     132             : 
     133        5696 :     rel = table_open(SubscriptionRelationId, AccessShareLock);
     134        5696 :     scan = table_beginscan_catalog(rel, 0, NULL);
     135             : 
     136        7528 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     137             :     {
     138        1832 :         Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
     139             :         Subscription *sub;
     140             :         MemoryContext oldcxt;
     141             : 
     142             :         /*
     143             :          * Allocate our results in the caller's context, not the
     144             :          * transaction's. We do this inside the loop, and restore the original
     145             :          * context at the end, so that leaky things like heap_getnext() are
     146             :          * not called in a potentially long-lived context.
     147             :          */
     148        1832 :         oldcxt = MemoryContextSwitchTo(resultcxt);
     149             : 
     150        1832 :         sub = (Subscription *) palloc0(sizeof(Subscription));
     151        1832 :         sub->oid = subform->oid;
     152        1832 :         sub->dbid = subform->subdbid;
     153        1832 :         sub->owner = subform->subowner;
     154        1832 :         sub->enabled = subform->subenabled;
     155        1832 :         sub->name = pstrdup(NameStr(subform->subname));
     156        1832 :         sub->retaindeadtuples = subform->subretaindeadtuples;
     157        1832 :         sub->retentionactive = subform->subretentionactive;
     158             :         /* We don't fill fields we are not interested in. */
     159             : 
     160        1832 :         res = lappend(res, sub);
     161        1832 :         MemoryContextSwitchTo(oldcxt);
     162             :     }
     163             : 
     164        5696 :     table_endscan(scan);
     165        5696 :     table_close(rel, AccessShareLock);
     166             : 
     167        5696 :     CommitTransactionCommand();
     168             : 
     169        5696 :     return res;
     170             : }
     171             : 
     172             : /*
     173             :  * Wait for a background worker to start up and attach to the shmem context.
     174             :  *
     175             :  * This is only needed for cleaning up the shared memory in case the worker
     176             :  * fails to attach.
     177             :  *
     178             :  * Returns whether the attach was successful.
     179             :  */
     180             : static bool
     181         838 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
     182             :                                uint16 generation,
     183             :                                BackgroundWorkerHandle *handle)
     184             : {
     185         838 :     bool        result = false;
     186         838 :     bool        dropped_latch = false;
     187             : 
     188             :     for (;;)
     189        3208 :     {
     190             :         BgwHandleStatus status;
     191             :         pid_t       pid;
     192             :         int         rc;
     193             : 
     194        4046 :         CHECK_FOR_INTERRUPTS();
     195             : 
     196        4046 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     197             : 
     198             :         /* Worker either died or has started. Return false if died. */
     199        4046 :         if (!worker->in_use || worker->proc)
     200             :         {
     201         836 :             result = worker->in_use;
     202         836 :             LWLockRelease(LogicalRepWorkerLock);
     203         836 :             break;
     204             :         }
     205             : 
     206        3210 :         LWLockRelease(LogicalRepWorkerLock);
     207             : 
     208             :         /* Check if worker has died before attaching, and clean up after it. */
     209        3210 :         status = GetBackgroundWorkerPid(handle, &pid);
     210             : 
     211        3210 :         if (status == BGWH_STOPPED)
     212             :         {
     213           0 :             LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     214             :             /* Ensure that this was indeed the worker we waited for. */
     215           0 :             if (generation == worker->generation)
     216           0 :                 logicalrep_worker_cleanup(worker);
     217           0 :             LWLockRelease(LogicalRepWorkerLock);
     218           0 :             break;              /* result is already false */
     219             :         }
     220             : 
     221             :         /*
     222             :          * We need timeout because we generally don't get notified via latch
     223             :          * about the worker attach.  But we don't expect to have to wait long.
     224             :          */
     225        3210 :         rc = WaitLatch(MyLatch,
     226             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     227             :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     228             : 
     229        3210 :         if (rc & WL_LATCH_SET)
     230             :         {
     231        1014 :             ResetLatch(MyLatch);
     232        1014 :             CHECK_FOR_INTERRUPTS();
     233        1012 :             dropped_latch = true;
     234             :         }
     235             :     }
     236             : 
     237             :     /*
     238             :      * If we had to clear a latch event in order to wait, be sure to restore
     239             :      * it before exiting.  Otherwise caller may miss events.
     240             :      */
     241         836 :     if (dropped_latch)
     242         836 :         SetLatch(MyLatch);
     243             : 
     244         836 :     return result;
     245             : }
     246             : 
     247             : /*
     248             :  * Walks the workers array and searches for one that matches given
     249             :  * subscription id and relid.
     250             :  *
     251             :  * We are only interested in the leader apply worker or table sync worker.
     252             :  */
     253             : LogicalRepWorker *
     254        6138 : logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
     255             : {
     256             :     int         i;
     257        6138 :     LogicalRepWorker *res = NULL;
     258             : 
     259             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     260             : 
     261             :     /* Search for attached worker for a given subscription id. */
     262       18514 :     for (i = 0; i < max_logical_replication_workers; i++)
     263             :     {
     264       16166 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     265             : 
     266             :         /* Skip parallel apply workers. */
     267       16166 :         if (isParallelApplyWorker(w))
     268           0 :             continue;
     269             : 
     270       16166 :         if (w->in_use && w->subid == subid && w->relid == relid &&
     271        3790 :             (!only_running || w->proc))
     272             :         {
     273        3790 :             res = w;
     274        3790 :             break;
     275             :         }
     276             :     }
     277             : 
     278        6138 :     return res;
     279             : }
     280             : 
     281             : /*
     282             :  * Similar to logicalrep_worker_find(), but returns a list of all workers for
     283             :  * the subscription, instead of just one.
     284             :  */
     285             : List *
     286        1336 : logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
     287             : {
     288             :     int         i;
     289        1336 :     List       *res = NIL;
     290             : 
     291        1336 :     if (acquire_lock)
     292         236 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     293             : 
     294             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     295             : 
     296             :     /* Search for attached worker for a given subscription id. */
     297        6936 :     for (i = 0; i < max_logical_replication_workers; i++)
     298             :     {
     299        5600 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     300             : 
     301        5600 :         if (w->in_use && w->subid == subid && (!only_running || w->proc))
     302         958 :             res = lappend(res, w);
     303             :     }
     304             : 
     305        1336 :     if (acquire_lock)
     306         236 :         LWLockRelease(LogicalRepWorkerLock);
     307             : 
     308        1336 :     return res;
     309             : }
     310             : 
     311             : /*
     312             :  * Start new logical replication background worker, if possible.
     313             :  *
     314             :  * Returns true on success, false on failure.
     315             :  */
     316             : bool
     317         846 : logicalrep_worker_launch(LogicalRepWorkerType wtype,
     318             :                          Oid dbid, Oid subid, const char *subname, Oid userid,
     319             :                          Oid relid, dsm_handle subworker_dsm,
     320             :                          bool retain_dead_tuples)
     321             : {
     322             :     BackgroundWorker bgw;
     323             :     BackgroundWorkerHandle *bgw_handle;
     324             :     uint16      generation;
     325             :     int         i;
     326         846 :     int         slot = 0;
     327         846 :     LogicalRepWorker *worker = NULL;
     328             :     int         nsyncworkers;
     329             :     int         nparallelapplyworkers;
     330             :     TimestampTz now;
     331         846 :     bool        is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
     332         846 :     bool        is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
     333             : 
     334             :     /*----------
     335             :      * Sanity checks:
     336             :      * - must be valid worker type
     337             :      * - tablesync workers are only ones to have relid
     338             :      * - parallel apply worker is the only kind of subworker
     339             :      * - The replication slot used in conflict detection is created when
     340             :      *   retain_dead_tuples is enabled
     341             :      */
     342             :     Assert(wtype != WORKERTYPE_UNKNOWN);
     343             :     Assert(is_tablesync_worker == OidIsValid(relid));
     344             :     Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
     345             :     Assert(!retain_dead_tuples || MyReplicationSlot);
     346             : 
     347         846 :     ereport(DEBUG1,
     348             :             (errmsg_internal("starting logical replication worker for subscription \"%s\"",
     349             :                              subname)));
     350             : 
     351             :     /* Report this after the initial starting message for consistency. */
     352         846 :     if (max_active_replication_origins == 0)
     353           0 :         ereport(ERROR,
     354             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     355             :                  errmsg("cannot start logical replication workers when \"max_active_replication_origins\" is 0")));
     356             : 
     357             :     /*
     358             :      * We need to do the modification of the shared memory under lock so that
     359             :      * we have consistent view.
     360             :      */
     361         846 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     362             : 
     363         846 : retry:
     364             :     /* Find unused worker slot. */
     365        1458 :     for (i = 0; i < max_logical_replication_workers; i++)
     366             :     {
     367        1458 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     368             : 
     369        1458 :         if (!w->in_use)
     370             :         {
     371         846 :             worker = w;
     372         846 :             slot = i;
     373         846 :             break;
     374             :         }
     375             :     }
     376             : 
     377         846 :     nsyncworkers = logicalrep_sync_worker_count(subid);
     378             : 
     379         846 :     now = GetCurrentTimestamp();
     380             : 
     381             :     /*
     382             :      * If we didn't find a free slot, try to do garbage collection.  The
     383             :      * reason we do this is because if some worker failed to start up and its
     384             :      * parent has crashed while waiting, the in_use state was never cleared.
     385             :      */
     386         846 :     if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
     387             :     {
     388           0 :         bool        did_cleanup = false;
     389             : 
     390           0 :         for (i = 0; i < max_logical_replication_workers; i++)
     391             :         {
     392           0 :             LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     393             : 
     394             :             /*
     395             :              * If the worker was marked in use but didn't manage to attach in
     396             :              * time, clean it up.
     397             :              */
     398           0 :             if (w->in_use && !w->proc &&
     399           0 :                 TimestampDifferenceExceeds(w->launch_time, now,
     400             :                                            wal_receiver_timeout))
     401             :             {
     402           0 :                 elog(WARNING,
     403             :                      "logical replication worker for subscription %u took too long to start; canceled",
     404             :                      w->subid);
     405             : 
     406           0 :                 logicalrep_worker_cleanup(w);
     407           0 :                 did_cleanup = true;
     408             :             }
     409             :         }
     410             : 
     411           0 :         if (did_cleanup)
     412           0 :             goto retry;
     413             :     }
     414             : 
     415             :     /*
     416             :      * We don't allow to invoke more sync workers once we have reached the
     417             :      * sync worker limit per subscription. So, just return silently as we
     418             :      * might get here because of an otherwise harmless race condition.
     419             :      */
     420         846 :     if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
     421             :     {
     422           0 :         LWLockRelease(LogicalRepWorkerLock);
     423           0 :         return false;
     424             :     }
     425             : 
     426         846 :     nparallelapplyworkers = logicalrep_pa_worker_count(subid);
     427             : 
     428             :     /*
     429             :      * Return false if the number of parallel apply workers reached the limit
     430             :      * per subscription.
     431             :      */
     432         846 :     if (is_parallel_apply_worker &&
     433          20 :         nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
     434             :     {
     435           0 :         LWLockRelease(LogicalRepWorkerLock);
     436           0 :         return false;
     437             :     }
     438             : 
     439             :     /*
     440             :      * However if there are no more free worker slots, inform user about it
     441             :      * before exiting.
     442             :      */
     443         846 :     if (worker == NULL)
     444             :     {
     445           0 :         LWLockRelease(LogicalRepWorkerLock);
     446           0 :         ereport(WARNING,
     447             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     448             :                  errmsg("out of logical replication worker slots"),
     449             :                  errhint("You might need to increase \"%s\".", "max_logical_replication_workers")));
     450           0 :         return false;
     451             :     }
     452             : 
     453             :     /* Prepare the worker slot. */
     454         846 :     worker->type = wtype;
     455         846 :     worker->launch_time = now;
     456         846 :     worker->in_use = true;
     457         846 :     worker->generation++;
     458         846 :     worker->proc = NULL;
     459         846 :     worker->dbid = dbid;
     460         846 :     worker->userid = userid;
     461         846 :     worker->subid = subid;
     462         846 :     worker->relid = relid;
     463         846 :     worker->relstate = SUBREL_STATE_UNKNOWN;
     464         846 :     worker->relstate_lsn = InvalidXLogRecPtr;
     465         846 :     worker->stream_fileset = NULL;
     466         846 :     worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
     467         846 :     worker->parallel_apply = is_parallel_apply_worker;
     468         846 :     worker->oldest_nonremovable_xid = retain_dead_tuples
     469           4 :         ? MyReplicationSlot->data.xmin
     470         846 :         : InvalidTransactionId;
     471         846 :     worker->last_lsn = InvalidXLogRecPtr;
     472         846 :     TIMESTAMP_NOBEGIN(worker->last_send_time);
     473         846 :     TIMESTAMP_NOBEGIN(worker->last_recv_time);
     474         846 :     worker->reply_lsn = InvalidXLogRecPtr;
     475         846 :     TIMESTAMP_NOBEGIN(worker->reply_time);
     476             : 
     477             :     /* Before releasing lock, remember generation for future identification. */
     478         846 :     generation = worker->generation;
     479             : 
     480         846 :     LWLockRelease(LogicalRepWorkerLock);
     481             : 
     482             :     /* Register the new dynamic worker. */
     483         846 :     memset(&bgw, 0, sizeof(bgw));
     484         846 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
     485             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     486         846 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
     487         846 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
     488             : 
     489         846 :     switch (worker->type)
     490             :     {
     491         430 :         case WORKERTYPE_APPLY:
     492         430 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
     493         430 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     494             :                      "logical replication apply worker for subscription %u",
     495             :                      subid);
     496         430 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
     497         430 :             break;
     498             : 
     499          20 :         case WORKERTYPE_PARALLEL_APPLY:
     500          20 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
     501          20 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     502             :                      "logical replication parallel apply worker for subscription %u",
     503             :                      subid);
     504          20 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
     505             : 
     506          20 :             memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
     507          20 :             break;
     508             : 
     509         396 :         case WORKERTYPE_TABLESYNC:
     510         396 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
     511         396 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     512             :                      "logical replication tablesync worker for subscription %u sync %u",
     513             :                      subid,
     514             :                      relid);
     515         396 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
     516         396 :             break;
     517             : 
     518           0 :         case WORKERTYPE_UNKNOWN:
     519             :             /* Should never happen. */
     520           0 :             elog(ERROR, "unknown worker type");
     521             :     }
     522             : 
     523         846 :     bgw.bgw_restart_time = BGW_NEVER_RESTART;
     524         846 :     bgw.bgw_notify_pid = MyProcPid;
     525         846 :     bgw.bgw_main_arg = Int32GetDatum(slot);
     526             : 
     527         846 :     if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
     528             :     {
     529             :         /* Failed to start worker, so clean up the worker slot. */
     530           8 :         LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     531             :         Assert(generation == worker->generation);
     532           8 :         logicalrep_worker_cleanup(worker);
     533           8 :         LWLockRelease(LogicalRepWorkerLock);
     534             : 
     535           8 :         ereport(WARNING,
     536             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     537             :                  errmsg("out of background worker slots"),
     538             :                  errhint("You might need to increase \"%s\".", "max_worker_processes")));
     539           8 :         return false;
     540             :     }
     541             : 
     542             :     /* Now wait until it attaches. */
     543         838 :     return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
     544             : }
     545             : 
     546             : /*
     547             :  * Internal function to stop the worker and wait until it detaches from the
     548             :  * slot.
     549             :  */
     550             : static void
     551         162 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
     552             : {
     553             :     uint16      generation;
     554             : 
     555             :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
     556             : 
     557             :     /*
     558             :      * Remember which generation was our worker so we can check if what we see
     559             :      * is still the same one.
     560             :      */
     561         162 :     generation = worker->generation;
     562             : 
     563             :     /*
     564             :      * If we found a worker but it does not have proc set then it is still
     565             :      * starting up; wait for it to finish starting and then kill it.
     566             :      */
     567         166 :     while (worker->in_use && !worker->proc)
     568             :     {
     569             :         int         rc;
     570             : 
     571          10 :         LWLockRelease(LogicalRepWorkerLock);
     572             : 
     573             :         /* Wait a bit --- we don't expect to have to wait long. */
     574          10 :         rc = WaitLatch(MyLatch,
     575             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     576             :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     577             : 
     578          10 :         if (rc & WL_LATCH_SET)
     579             :         {
     580           2 :             ResetLatch(MyLatch);
     581           2 :             CHECK_FOR_INTERRUPTS();
     582             :         }
     583             : 
     584             :         /* Recheck worker status. */
     585          10 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     586             : 
     587             :         /*
     588             :          * Check whether the worker slot is no longer used, which would mean
     589             :          * that the worker has exited, or whether the worker generation is
     590             :          * different, meaning that a different worker has taken the slot.
     591             :          */
     592          10 :         if (!worker->in_use || worker->generation != generation)
     593           0 :             return;
     594             : 
     595             :         /* Worker has assigned proc, so it has started. */
     596          10 :         if (worker->proc)
     597           6 :             break;
     598             :     }
     599             : 
     600             :     /* Now terminate the worker ... */
     601         162 :     kill(worker->proc->pid, signo);
     602             : 
     603             :     /* ... and wait for it to die. */
     604             :     for (;;)
     605         216 :     {
     606             :         int         rc;
     607             : 
     608             :         /* is it gone? */
     609         378 :         if (!worker->proc || worker->generation != generation)
     610             :             break;
     611             : 
     612         216 :         LWLockRelease(LogicalRepWorkerLock);
     613             : 
     614             :         /* Wait a bit --- we don't expect to have to wait long. */
     615         216 :         rc = WaitLatch(MyLatch,
     616             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     617             :                        10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
     618             : 
     619         216 :         if (rc & WL_LATCH_SET)
     620             :         {
     621          96 :             ResetLatch(MyLatch);
     622          96 :             CHECK_FOR_INTERRUPTS();
     623             :         }
     624             : 
     625         216 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     626             :     }
     627             : }
     628             : 
     629             : /*
     630             :  * Stop the logical replication worker for subid/relid, if any.
     631             :  */
     632             : void
     633         188 : logicalrep_worker_stop(Oid subid, Oid relid)
     634             : {
     635             :     LogicalRepWorker *worker;
     636             : 
     637         188 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     638             : 
     639         188 :     worker = logicalrep_worker_find(subid, relid, false);
     640             : 
     641         188 :     if (worker)
     642             :     {
     643             :         Assert(!isParallelApplyWorker(worker));
     644         146 :         logicalrep_worker_stop_internal(worker, SIGTERM);
     645             :     }
     646             : 
     647         188 :     LWLockRelease(LogicalRepWorkerLock);
     648         188 : }
     649             : 
     650             : /*
     651             :  * Stop the given logical replication parallel apply worker.
     652             :  *
     653             :  * Node that the function sends SIGINT instead of SIGTERM to the parallel apply
     654             :  * worker so that the worker exits cleanly.
     655             :  */
     656             : void
     657          10 : logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
     658             : {
     659             :     int         slot_no;
     660             :     uint16      generation;
     661             :     LogicalRepWorker *worker;
     662             : 
     663          10 :     SpinLockAcquire(&winfo->shared->mutex);
     664          10 :     generation = winfo->shared->logicalrep_worker_generation;
     665          10 :     slot_no = winfo->shared->logicalrep_worker_slot_no;
     666          10 :     SpinLockRelease(&winfo->shared->mutex);
     667             : 
     668             :     Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
     669             : 
     670             :     /*
     671             :      * Detach from the error_mq_handle for the parallel apply worker before
     672             :      * stopping it. This prevents the leader apply worker from trying to
     673             :      * receive the message from the error queue that might already be detached
     674             :      * by the parallel apply worker.
     675             :      */
     676          10 :     if (winfo->error_mq_handle)
     677             :     {
     678          10 :         shm_mq_detach(winfo->error_mq_handle);
     679          10 :         winfo->error_mq_handle = NULL;
     680             :     }
     681             : 
     682          10 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     683             : 
     684          10 :     worker = &LogicalRepCtx->workers[slot_no];
     685             :     Assert(isParallelApplyWorker(worker));
     686             : 
     687             :     /*
     688             :      * Only stop the worker if the generation matches and the worker is alive.
     689             :      */
     690          10 :     if (worker->generation == generation && worker->proc)
     691          10 :         logicalrep_worker_stop_internal(worker, SIGINT);
     692             : 
     693          10 :     LWLockRelease(LogicalRepWorkerLock);
     694          10 : }
     695             : 
     696             : /*
     697             :  * Wake up (using latch) any logical replication worker for specified sub/rel.
     698             :  */
     699             : void
     700         420 : logicalrep_worker_wakeup(Oid subid, Oid relid)
     701             : {
     702             :     LogicalRepWorker *worker;
     703             : 
     704         420 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     705             : 
     706         420 :     worker = logicalrep_worker_find(subid, relid, true);
     707             : 
     708         420 :     if (worker)
     709         420 :         logicalrep_worker_wakeup_ptr(worker);
     710             : 
     711         420 :     LWLockRelease(LogicalRepWorkerLock);
     712         420 : }
     713             : 
     714             : /*
     715             :  * Wake up (using latch) the specified logical replication worker.
     716             :  *
     717             :  * Caller must hold lock, else worker->proc could change under us.
     718             :  */
     719             : void
     720        1280 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
     721             : {
     722             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     723             : 
     724        1280 :     SetLatch(&worker->proc->procLatch);
     725        1280 : }
     726             : 
     727             : /*
     728             :  * Attach to a slot.
     729             :  */
     730             : void
     731        1084 : logicalrep_worker_attach(int slot)
     732             : {
     733             :     /* Block concurrent access. */
     734        1084 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     735             : 
     736             :     Assert(slot >= 0 && slot < max_logical_replication_workers);
     737        1084 :     MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
     738             : 
     739        1084 :     if (!MyLogicalRepWorker->in_use)
     740             :     {
     741           0 :         LWLockRelease(LogicalRepWorkerLock);
     742           0 :         ereport(ERROR,
     743             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     744             :                  errmsg("logical replication worker slot %d is empty, cannot attach",
     745             :                         slot)));
     746             :     }
     747             : 
     748        1084 :     if (MyLogicalRepWorker->proc)
     749             :     {
     750           0 :         LWLockRelease(LogicalRepWorkerLock);
     751           0 :         ereport(ERROR,
     752             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     753             :                  errmsg("logical replication worker slot %d is already used by "
     754             :                         "another worker, cannot attach", slot)));
     755             :     }
     756             : 
     757        1084 :     MyLogicalRepWorker->proc = MyProc;
     758        1084 :     before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
     759             : 
     760        1084 :     LWLockRelease(LogicalRepWorkerLock);
     761        1084 : }
     762             : 
     763             : /*
     764             :  * Stop the parallel apply workers if any, and detach the leader apply worker
     765             :  * (cleans up the worker info).
     766             :  */
     767             : static void
     768        1084 : logicalrep_worker_detach(void)
     769             : {
     770             :     /* Stop the parallel apply workers. */
     771        1084 :     if (am_leader_apply_worker())
     772             :     {
     773             :         List       *workers;
     774             :         ListCell   *lc;
     775             : 
     776             :         /*
     777             :          * Detach from the error_mq_handle for all parallel apply workers
     778             :          * before terminating them. This prevents the leader apply worker from
     779             :          * receiving the worker termination message and sending it to logs
     780             :          * when the same is already done by the parallel worker.
     781             :          */
     782         672 :         pa_detach_all_error_mq();
     783             : 
     784         672 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     785             : 
     786         672 :         workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
     787        1352 :         foreach(lc, workers)
     788             :         {
     789         680 :             LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
     790             : 
     791         680 :             if (isParallelApplyWorker(w))
     792           6 :                 logicalrep_worker_stop_internal(w, SIGTERM);
     793             :         }
     794             : 
     795         672 :         LWLockRelease(LogicalRepWorkerLock);
     796             : 
     797         672 :         list_free(workers);
     798             :     }
     799             : 
     800             :     /* Block concurrent access. */
     801        1084 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     802             : 
     803        1084 :     logicalrep_worker_cleanup(MyLogicalRepWorker);
     804             : 
     805        1084 :     LWLockRelease(LogicalRepWorkerLock);
     806        1084 : }
     807             : 
     808             : /*
     809             :  * Clean up worker info.
     810             :  */
     811             : static void
     812        1092 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
     813             : {
     814             :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
     815             : 
     816        1092 :     worker->type = WORKERTYPE_UNKNOWN;
     817        1092 :     worker->in_use = false;
     818        1092 :     worker->proc = NULL;
     819        1092 :     worker->dbid = InvalidOid;
     820        1092 :     worker->userid = InvalidOid;
     821        1092 :     worker->subid = InvalidOid;
     822        1092 :     worker->relid = InvalidOid;
     823        1092 :     worker->leader_pid = InvalidPid;
     824        1092 :     worker->parallel_apply = false;
     825        1092 : }
     826             : 
     827             : /*
     828             :  * Cleanup function for logical replication launcher.
     829             :  *
     830             :  * Called on logical replication launcher exit.
     831             :  */
     832             : static void
     833         844 : logicalrep_launcher_onexit(int code, Datum arg)
     834             : {
     835         844 :     LogicalRepCtx->launcher_pid = 0;
     836         844 : }
     837             : 
     838             : /*
     839             :  * Cleanup function.
     840             :  *
     841             :  * Called on logical replication worker exit.
     842             :  */
     843             : static void
     844        1084 : logicalrep_worker_onexit(int code, Datum arg)
     845             : {
     846             :     /* Disconnect gracefully from the remote side. */
     847        1084 :     if (LogRepWorkerWalRcvConn)
     848         838 :         walrcv_disconnect(LogRepWorkerWalRcvConn);
     849             : 
     850        1084 :     logicalrep_worker_detach();
     851             : 
     852             :     /* Cleanup fileset used for streaming transactions. */
     853        1084 :     if (MyLogicalRepWorker->stream_fileset != NULL)
     854          28 :         FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
     855             : 
     856             :     /*
     857             :      * Session level locks may be acquired outside of a transaction in
     858             :      * parallel apply mode and will not be released when the worker
     859             :      * terminates, so manually release all locks before the worker exits.
     860             :      *
     861             :      * The locks will be acquired once the worker is initialized.
     862             :      */
     863        1084 :     if (!InitializingApplyWorker)
     864         958 :         LockReleaseAll(DEFAULT_LOCKMETHOD, true);
     865             : 
     866        1084 :     ApplyLauncherWakeup();
     867        1084 : }
     868             : 
     869             : /*
     870             :  * Count the number of registered (not necessarily running) sync workers
     871             :  * for a subscription.
     872             :  */
     873             : int
     874        2484 : logicalrep_sync_worker_count(Oid subid)
     875             : {
     876             :     int         i;
     877        2484 :     int         res = 0;
     878             : 
     879             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     880             : 
     881             :     /* Search for attached worker for a given subscription id. */
     882       12824 :     for (i = 0; i < max_logical_replication_workers; i++)
     883             :     {
     884       10340 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     885             : 
     886       10340 :         if (isTablesyncWorker(w) && w->subid == subid)
     887        2640 :             res++;
     888             :     }
     889             : 
     890        2484 :     return res;
     891             : }
     892             : 
     893             : /*
     894             :  * Count the number of registered (but not necessarily running) parallel apply
     895             :  * workers for a subscription.
     896             :  */
     897             : static int
     898         846 : logicalrep_pa_worker_count(Oid subid)
     899             : {
     900             :     int         i;
     901         846 :     int         res = 0;
     902             : 
     903             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     904             : 
     905             :     /*
     906             :      * Scan all attached parallel apply workers, only counting those which
     907             :      * have the given subscription id.
     908             :      */
     909        4478 :     for (i = 0; i < max_logical_replication_workers; i++)
     910             :     {
     911        3632 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     912             : 
     913        3632 :         if (isParallelApplyWorker(w) && w->subid == subid)
     914           4 :             res++;
     915             :     }
     916             : 
     917         846 :     return res;
     918             : }
     919             : 
     920             : /*
     921             :  * ApplyLauncherShmemSize
     922             :  *      Compute space needed for replication launcher shared memory
     923             :  */
     924             : Size
     925        8396 : ApplyLauncherShmemSize(void)
     926             : {
     927             :     Size        size;
     928             : 
     929             :     /*
     930             :      * Need the fixed struct and the array of LogicalRepWorker.
     931             :      */
     932        8396 :     size = sizeof(LogicalRepCtxStruct);
     933        8396 :     size = MAXALIGN(size);
     934        8396 :     size = add_size(size, mul_size(max_logical_replication_workers,
     935             :                                    sizeof(LogicalRepWorker)));
     936        8396 :     return size;
     937             : }
     938             : 
     939             : /*
     940             :  * ApplyLauncherRegister
     941             :  *      Register a background worker running the logical replication launcher.
     942             :  */
     943             : void
     944        1740 : ApplyLauncherRegister(void)
     945             : {
     946             :     BackgroundWorker bgw;
     947             : 
     948             :     /*
     949             :      * The logical replication launcher is disabled during binary upgrades, to
     950             :      * prevent logical replication workers from running on the source cluster.
     951             :      * That could cause replication origins to move forward after having been
     952             :      * copied to the target cluster, potentially creating conflicts with the
     953             :      * copied data files.
     954             :      */
     955        1740 :     if (max_logical_replication_workers == 0 || IsBinaryUpgrade)
     956         106 :         return;
     957             : 
     958        1634 :     memset(&bgw, 0, sizeof(bgw));
     959        1634 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
     960             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     961        1634 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
     962        1634 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
     963        1634 :     snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
     964        1634 :     snprintf(bgw.bgw_name, BGW_MAXLEN,
     965             :              "logical replication launcher");
     966        1634 :     snprintf(bgw.bgw_type, BGW_MAXLEN,
     967             :              "logical replication launcher");
     968        1634 :     bgw.bgw_restart_time = 5;
     969        1634 :     bgw.bgw_notify_pid = 0;
     970        1634 :     bgw.bgw_main_arg = (Datum) 0;
     971             : 
     972        1634 :     RegisterBackgroundWorker(&bgw);
     973             : }
     974             : 
     975             : /*
     976             :  * ApplyLauncherShmemInit
     977             :  *      Allocate and initialize replication launcher shared memory
     978             :  */
     979             : void
     980        2174 : ApplyLauncherShmemInit(void)
     981             : {
     982             :     bool        found;
     983             : 
     984        2174 :     LogicalRepCtx = (LogicalRepCtxStruct *)
     985        2174 :         ShmemInitStruct("Logical Replication Launcher Data",
     986             :                         ApplyLauncherShmemSize(),
     987             :                         &found);
     988             : 
     989        2174 :     if (!found)
     990             :     {
     991             :         int         slot;
     992             : 
     993        2174 :         memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
     994             : 
     995        2174 :         LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
     996        2174 :         LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
     997             : 
     998             :         /* Initialize memory and spin locks for each worker slot. */
     999       10796 :         for (slot = 0; slot < max_logical_replication_workers; slot++)
    1000             :         {
    1001        8622 :             LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
    1002             : 
    1003        8622 :             memset(worker, 0, sizeof(LogicalRepWorker));
    1004        8622 :             SpinLockInit(&worker->relmutex);
    1005             :         }
    1006             :     }
    1007        2174 : }
    1008             : 
    1009             : /*
    1010             :  * Initialize or attach to the dynamic shared hash table that stores the
    1011             :  * last-start times, if not already done.
    1012             :  * This must be called before accessing the table.
    1013             :  */
    1014             : static void
    1015        1554 : logicalrep_launcher_attach_dshmem(void)
    1016             : {
    1017             :     MemoryContext oldcontext;
    1018             : 
    1019             :     /* Quick exit if we already did this. */
    1020        1554 :     if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
    1021        1444 :         last_start_times != NULL)
    1022        1066 :         return;
    1023             : 
    1024             :     /* Otherwise, use a lock to ensure only one process creates the table. */
    1025         488 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
    1026             : 
    1027             :     /* Be sure any local memory allocated by DSA routines is persistent. */
    1028         488 :     oldcontext = MemoryContextSwitchTo(TopMemoryContext);
    1029             : 
    1030         488 :     if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
    1031             :     {
    1032             :         /* Initialize dynamic shared hash table for last-start times. */
    1033         110 :         last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
    1034         110 :         dsa_pin(last_start_times_dsa);
    1035         110 :         dsa_pin_mapping(last_start_times_dsa);
    1036         110 :         last_start_times = dshash_create(last_start_times_dsa, &dsh_params, NULL);
    1037             : 
    1038             :         /* Store handles in shared memory for other backends to use. */
    1039         110 :         LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
    1040         110 :         LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
    1041             :     }
    1042         378 :     else if (!last_start_times)
    1043             :     {
    1044             :         /* Attach to existing dynamic shared hash table. */
    1045         378 :         last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
    1046         378 :         dsa_pin_mapping(last_start_times_dsa);
    1047         378 :         last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
    1048         378 :                                          LogicalRepCtx->last_start_dsh, NULL);
    1049             :     }
    1050             : 
    1051         488 :     MemoryContextSwitchTo(oldcontext);
    1052         488 :     LWLockRelease(LogicalRepWorkerLock);
    1053             : }
    1054             : 
    1055             : /*
    1056             :  * Set the last-start time for the subscription.
    1057             :  */
    1058             : static void
    1059         430 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
    1060             : {
    1061             :     LauncherLastStartTimesEntry *entry;
    1062             :     bool        found;
    1063             : 
    1064         430 :     logicalrep_launcher_attach_dshmem();
    1065             : 
    1066         430 :     entry = dshash_find_or_insert(last_start_times, &subid, &found);
    1067         430 :     entry->last_start_time = start_time;
    1068         430 :     dshash_release_lock(last_start_times, entry);
    1069         430 : }
    1070             : 
    1071             : /*
    1072             :  * Return the last-start time for the subscription, or 0 if there isn't one.
    1073             :  */
    1074             : static TimestampTz
    1075         668 : ApplyLauncherGetWorkerStartTime(Oid subid)
    1076             : {
    1077             :     LauncherLastStartTimesEntry *entry;
    1078             :     TimestampTz ret;
    1079             : 
    1080         668 :     logicalrep_launcher_attach_dshmem();
    1081             : 
    1082         668 :     entry = dshash_find(last_start_times, &subid, false);
    1083         668 :     if (entry == NULL)
    1084         264 :         return 0;
    1085             : 
    1086         404 :     ret = entry->last_start_time;
    1087         404 :     dshash_release_lock(last_start_times, entry);
    1088             : 
    1089         404 :     return ret;
    1090             : }
    1091             : 
    1092             : /*
    1093             :  * Remove the last-start-time entry for the subscription, if one exists.
    1094             :  *
    1095             :  * This has two use-cases: to remove the entry related to a subscription
    1096             :  * that's been deleted or disabled (just to avoid leaking shared memory),
    1097             :  * and to allow immediate restart of an apply worker that has exited
    1098             :  * due to subscription parameter changes.
    1099             :  */
    1100             : void
    1101         456 : ApplyLauncherForgetWorkerStartTime(Oid subid)
    1102             : {
    1103         456 :     logicalrep_launcher_attach_dshmem();
    1104             : 
    1105         456 :     (void) dshash_delete_key(last_start_times, &subid);
    1106         456 : }
    1107             : 
    1108             : /*
    1109             :  * Wakeup the launcher on commit if requested.
    1110             :  */
    1111             : void
    1112     1021192 : AtEOXact_ApplyLauncher(bool isCommit)
    1113             : {
    1114     1021192 :     if (isCommit)
    1115             :     {
    1116      970794 :         if (on_commit_launcher_wakeup)
    1117         286 :             ApplyLauncherWakeup();
    1118             :     }
    1119             : 
    1120     1021192 :     on_commit_launcher_wakeup = false;
    1121     1021192 : }
    1122             : 
    1123             : /*
    1124             :  * Request wakeup of the launcher on commit of the transaction.
    1125             :  *
    1126             :  * This is used to send launcher signal to stop sleeping and process the
    1127             :  * subscriptions when current transaction commits. Should be used when new
    1128             :  * tuple was added to the pg_subscription catalog.
    1129             : */
    1130             : void
    1131         288 : ApplyLauncherWakeupAtCommit(void)
    1132             : {
    1133         288 :     if (!on_commit_launcher_wakeup)
    1134         286 :         on_commit_launcher_wakeup = true;
    1135         288 : }
    1136             : 
    1137             : /*
    1138             :  * Wakeup the launcher immediately.
    1139             :  */
    1140             : void
    1141        1448 : ApplyLauncherWakeup(void)
    1142             : {
    1143        1448 :     if (LogicalRepCtx->launcher_pid != 0)
    1144        1430 :         kill(LogicalRepCtx->launcher_pid, SIGUSR1);
    1145        1448 : }
    1146             : 
    1147             : /*
    1148             :  * Main loop for the apply launcher process.
    1149             :  */
    1150             : void
    1151         844 : ApplyLauncherMain(Datum main_arg)
    1152             : {
    1153         844 :     ereport(DEBUG1,
    1154             :             (errmsg_internal("logical replication launcher started")));
    1155             : 
    1156         844 :     before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
    1157             : 
    1158             :     Assert(LogicalRepCtx->launcher_pid == 0);
    1159         844 :     LogicalRepCtx->launcher_pid = MyProcPid;
    1160             : 
    1161             :     /* Establish signal handlers. */
    1162         844 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
    1163         844 :     pqsignal(SIGTERM, die);
    1164         844 :     BackgroundWorkerUnblockSignals();
    1165             : 
    1166             :     /*
    1167             :      * Establish connection to nailed catalogs (we only ever access
    1168             :      * pg_subscription).
    1169             :      */
    1170         844 :     BackgroundWorkerInitializeConnection(NULL, NULL, 0);
    1171             : 
    1172             :     /*
    1173             :      * Acquire the conflict detection slot at startup to ensure it can be
    1174             :      * dropped if no longer needed after a restart.
    1175             :      */
    1176         844 :     acquire_conflict_slot_if_exists();
    1177             : 
    1178             :     /* Enter main loop */
    1179             :     for (;;)
    1180        4856 :     {
    1181             :         int         rc;
    1182             :         List       *sublist;
    1183             :         ListCell   *lc;
    1184             :         MemoryContext subctx;
    1185             :         MemoryContext oldctx;
    1186        5700 :         long        wait_time = DEFAULT_NAPTIME_PER_CYCLE;
    1187        5700 :         bool        can_update_xmin = true;
    1188        5700 :         bool        retain_dead_tuples = false;
    1189        5700 :         TransactionId xmin = InvalidTransactionId;
    1190             : 
    1191        5700 :         CHECK_FOR_INTERRUPTS();
    1192             : 
    1193             :         /* Use temporary context to avoid leaking memory across cycles. */
    1194        5696 :         subctx = AllocSetContextCreate(TopMemoryContext,
    1195             :                                        "Logical Replication Launcher sublist",
    1196             :                                        ALLOCSET_DEFAULT_SIZES);
    1197        5696 :         oldctx = MemoryContextSwitchTo(subctx);
    1198             : 
    1199             :         /*
    1200             :          * Start any missing workers for enabled subscriptions.
    1201             :          *
    1202             :          * Also, during the iteration through all subscriptions, we compute
    1203             :          * the minimum XID required to protect deleted tuples for conflict
    1204             :          * detection if one of the subscription enables retain_dead_tuples
    1205             :          * option.
    1206             :          */
    1207        5696 :         sublist = get_subscription_list();
    1208        7528 :         foreach(lc, sublist)
    1209             :         {
    1210        1832 :             Subscription *sub = (Subscription *) lfirst(lc);
    1211             :             LogicalRepWorker *w;
    1212             :             TimestampTz last_start;
    1213             :             TimestampTz now;
    1214             :             long        elapsed;
    1215             : 
    1216        1832 :             if (sub->retaindeadtuples)
    1217             :             {
    1218         136 :                 retain_dead_tuples = true;
    1219             : 
    1220             :                 /*
    1221             :                  * Create a replication slot to retain information necessary
    1222             :                  * for conflict detection such as dead tuples, commit
    1223             :                  * timestamps, and origins.
    1224             :                  *
    1225             :                  * The slot is created before starting the apply worker to
    1226             :                  * prevent it from unnecessarily maintaining its
    1227             :                  * oldest_nonremovable_xid.
    1228             :                  *
    1229             :                  * The slot is created even for a disabled subscription to
    1230             :                  * ensure that conflict-related information is available when
    1231             :                  * applying remote changes that occurred before the
    1232             :                  * subscription was enabled.
    1233             :                  */
    1234         136 :                 CreateConflictDetectionSlot();
    1235             : 
    1236         136 :                 if (sub->retentionactive)
    1237             :                 {
    1238             :                     /*
    1239             :                      * Can't advance xmin of the slot unless all the
    1240             :                      * subscriptions actively retaining dead tuples are
    1241             :                      * enabled. This is required to ensure that we don't
    1242             :                      * advance the xmin of CONFLICT_DETECTION_SLOT if one of
    1243             :                      * the subscriptions is not enabled. Otherwise, we won't
    1244             :                      * be able to detect conflicts reliably for such a
    1245             :                      * subscription even though it has set the
    1246             :                      * retain_dead_tuples option.
    1247             :                      */
    1248         136 :                     can_update_xmin &= sub->enabled;
    1249             : 
    1250             :                     /*
    1251             :                      * Initialize the slot once the subscription activiates
    1252             :                      * retention.
    1253             :                      */
    1254         136 :                     if (!TransactionIdIsValid(MyReplicationSlot->data.xmin))
    1255           0 :                         init_conflict_slot_xmin();
    1256             :                 }
    1257             :             }
    1258             : 
    1259        1832 :             if (!sub->enabled)
    1260          76 :                 continue;
    1261             : 
    1262        1756 :             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1263        1756 :             w = logicalrep_worker_find(sub->oid, InvalidOid, false);
    1264        1756 :             LWLockRelease(LogicalRepWorkerLock);
    1265             : 
    1266        1756 :             if (w != NULL)
    1267             :             {
    1268             :                 /*
    1269             :                  * Compute the minimum xmin required to protect dead tuples
    1270             :                  * required for conflict detection among all running apply
    1271             :                  * workers.
    1272             :                  */
    1273        1088 :                 if (sub->retaindeadtuples &&
    1274         128 :                     sub->retentionactive &&
    1275             :                     can_update_xmin)
    1276         128 :                     compute_min_nonremovable_xid(w, &xmin);
    1277             : 
    1278             :                 /* worker is running already */
    1279        1088 :                 continue;
    1280             :             }
    1281             : 
    1282             :             /*
    1283             :              * Can't advance xmin of the slot unless all the workers
    1284             :              * corresponding to subscriptions actively retaining dead tuples
    1285             :              * are running, disabling the further computation of the minimum
    1286             :              * nonremovable xid.
    1287             :              */
    1288         668 :             if (sub->retaindeadtuples && sub->retentionactive)
    1289           4 :                 can_update_xmin = false;
    1290             : 
    1291             :             /*
    1292             :              * If the worker is eligible to start now, launch it.  Otherwise,
    1293             :              * adjust wait_time so that we'll wake up as soon as it can be
    1294             :              * started.
    1295             :              *
    1296             :              * Each subscription's apply worker can only be restarted once per
    1297             :              * wal_retrieve_retry_interval, so that errors do not cause us to
    1298             :              * repeatedly restart the worker as fast as possible.  In cases
    1299             :              * where a restart is expected (e.g., subscription parameter
    1300             :              * changes), another process should remove the last-start entry
    1301             :              * for the subscription so that the worker can be restarted
    1302             :              * without waiting for wal_retrieve_retry_interval to elapse.
    1303             :              */
    1304         668 :             last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
    1305         668 :             now = GetCurrentTimestamp();
    1306         668 :             if (last_start == 0 ||
    1307         404 :                 (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
    1308             :             {
    1309         430 :                 ApplyLauncherSetWorkerStartTime(sub->oid, now);
    1310         430 :                 if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
    1311         430 :                                               sub->dbid, sub->oid, sub->name,
    1312             :                                               sub->owner, InvalidOid,
    1313             :                                               DSM_HANDLE_INVALID,
    1314         434 :                                               sub->retaindeadtuples &&
    1315           4 :                                               sub->retentionactive))
    1316             :                 {
    1317             :                     /*
    1318             :                      * We get here either if we failed to launch a worker
    1319             :                      * (perhaps for resource-exhaustion reasons) or if we
    1320             :                      * launched one but it immediately quit.  Either way, it
    1321             :                      * seems appropriate to try again after
    1322             :                      * wal_retrieve_retry_interval.
    1323             :                      */
    1324          16 :                     wait_time = Min(wait_time,
    1325             :                                     wal_retrieve_retry_interval);
    1326             :                 }
    1327             :             }
    1328             :             else
    1329             :             {
    1330         238 :                 wait_time = Min(wait_time,
    1331             :                                 wal_retrieve_retry_interval - elapsed);
    1332             :             }
    1333             :         }
    1334             : 
    1335             :         /*
    1336             :          * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription
    1337             :          * that requires us to retain dead tuples. Otherwise, if required,
    1338             :          * advance the slot's xmin to protect dead tuples required for the
    1339             :          * conflict detection.
    1340             :          *
    1341             :          * Additionally, if all apply workers for subscriptions with
    1342             :          * retain_dead_tuples enabled have requested to stop retention, the
    1343             :          * slot's xmin will be set to InvalidTransactionId allowing the
    1344             :          * removal of dead tuples.
    1345             :          */
    1346        5696 :         if (MyReplicationSlot)
    1347             :         {
    1348         138 :             if (!retain_dead_tuples)
    1349           2 :                 ReplicationSlotDropAcquired();
    1350         136 :             else if (can_update_xmin)
    1351         128 :                 update_conflict_slot_xmin(xmin);
    1352             :         }
    1353             : 
    1354             :         /* Switch back to original memory context. */
    1355        5696 :         MemoryContextSwitchTo(oldctx);
    1356             :         /* Clean the temporary memory. */
    1357        5696 :         MemoryContextDelete(subctx);
    1358             : 
    1359             :         /* Wait for more work. */
    1360        5696 :         rc = WaitLatch(MyLatch,
    1361             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1362             :                        wait_time,
    1363             :                        WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
    1364             : 
    1365        5690 :         if (rc & WL_LATCH_SET)
    1366             :         {
    1367        5626 :             ResetLatch(MyLatch);
    1368        5626 :             CHECK_FOR_INTERRUPTS();
    1369             :         }
    1370             : 
    1371        4856 :         if (ConfigReloadPending)
    1372             :         {
    1373          72 :             ConfigReloadPending = false;
    1374          72 :             ProcessConfigFile(PGC_SIGHUP);
    1375             :         }
    1376             :     }
    1377             : 
    1378             :     /* Not reachable */
    1379             : }
    1380             : 
    1381             : /*
    1382             :  * Determine the minimum non-removable transaction ID across all apply workers
    1383             :  * for subscriptions that have retain_dead_tuples enabled. Store the result
    1384             :  * in *xmin.
    1385             :  */
    1386             : static void
    1387         128 : compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
    1388             : {
    1389             :     TransactionId nonremovable_xid;
    1390             : 
    1391             :     Assert(worker != NULL);
    1392             : 
    1393             :     /*
    1394             :      * The replication slot for conflict detection must be created before the
    1395             :      * worker starts.
    1396             :      */
    1397             :     Assert(MyReplicationSlot);
    1398             : 
    1399         128 :     SpinLockAcquire(&worker->relmutex);
    1400         128 :     nonremovable_xid = worker->oldest_nonremovable_xid;
    1401         128 :     SpinLockRelease(&worker->relmutex);
    1402             : 
    1403             :     /*
    1404             :      * Return if the apply worker has stopped retention concurrently.
    1405             :      *
    1406             :      * Although this function is invoked only when retentionactive is true,
    1407             :      * the apply worker might stop retention after the launcher fetches the
    1408             :      * retentionactive flag.
    1409             :      */
    1410         128 :     if (!TransactionIdIsValid(nonremovable_xid))
    1411           0 :         return;
    1412             : 
    1413         128 :     if (!TransactionIdIsValid(*xmin) ||
    1414           0 :         TransactionIdPrecedes(nonremovable_xid, *xmin))
    1415         128 :         *xmin = nonremovable_xid;
    1416             : }
    1417             : 
    1418             : /*
    1419             :  * Acquire the replication slot used to retain information for conflict
    1420             :  * detection, if it exists.
    1421             :  *
    1422             :  * Return true if successfully acquired, otherwise return false.
    1423             :  */
    1424             : static bool
    1425         844 : acquire_conflict_slot_if_exists(void)
    1426             : {
    1427         844 :     if (!SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true))
    1428         842 :         return false;
    1429             : 
    1430           2 :     ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false);
    1431           2 :     return true;
    1432             : }
    1433             : 
    1434             : /*
    1435             :  * Update the xmin the replication slot used to retain information required
    1436             :  * for conflict detection.
    1437             :  */
    1438             : static void
    1439         128 : update_conflict_slot_xmin(TransactionId new_xmin)
    1440             : {
    1441             :     Assert(MyReplicationSlot);
    1442             :     Assert(!TransactionIdIsValid(new_xmin) ||
    1443             :            TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
    1444             : 
    1445             :     /* Return if the xmin value of the slot cannot be updated */
    1446         128 :     if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin))
    1447          96 :         return;
    1448             : 
    1449          32 :     SpinLockAcquire(&MyReplicationSlot->mutex);
    1450          32 :     MyReplicationSlot->effective_xmin = new_xmin;
    1451          32 :     MyReplicationSlot->data.xmin = new_xmin;
    1452          32 :     SpinLockRelease(&MyReplicationSlot->mutex);
    1453             : 
    1454          32 :     elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin);
    1455             : 
    1456          32 :     ReplicationSlotMarkDirty();
    1457          32 :     ReplicationSlotsComputeRequiredXmin(false);
    1458             : 
    1459             :     /*
    1460             :      * Like PhysicalConfirmReceivedLocation(), do not save slot information
    1461             :      * each time. This is acceptable because all concurrent transactions on
    1462             :      * the publisher that require the data preceding the slot's xmin should
    1463             :      * have already been applied and flushed on the subscriber before the xmin
    1464             :      * is advanced. So, even if the slot's xmin regresses after a restart, it
    1465             :      * will be advanced again in the next cycle. Therefore, no data required
    1466             :      * for conflict detection will be prematurely removed.
    1467             :      */
    1468          32 :     return;
    1469             : }
    1470             : 
    1471             : /*
    1472             :  * Initialize the xmin for the conflict detection slot.
    1473             :  */
    1474             : static void
    1475           8 : init_conflict_slot_xmin(void)
    1476             : {
    1477             :     TransactionId xmin_horizon;
    1478             : 
    1479             :     /* Replication slot must exist but shouldn't be initialized. */
    1480             :     Assert(MyReplicationSlot &&
    1481             :            !TransactionIdIsValid(MyReplicationSlot->data.xmin));
    1482             : 
    1483           8 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
    1484             : 
    1485           8 :     xmin_horizon = GetOldestSafeDecodingTransactionId(false);
    1486             : 
    1487           8 :     SpinLockAcquire(&MyReplicationSlot->mutex);
    1488           8 :     MyReplicationSlot->effective_xmin = xmin_horizon;
    1489           8 :     MyReplicationSlot->data.xmin = xmin_horizon;
    1490           8 :     SpinLockRelease(&MyReplicationSlot->mutex);
    1491             : 
    1492           8 :     ReplicationSlotsComputeRequiredXmin(true);
    1493             : 
    1494           8 :     LWLockRelease(ProcArrayLock);
    1495             : 
    1496             :     /* Write this slot to disk */
    1497           8 :     ReplicationSlotMarkDirty();
    1498           8 :     ReplicationSlotSave();
    1499           8 : }
    1500             : 
    1501             : /*
    1502             :  * Create and acquire the replication slot used to retain information for
    1503             :  * conflict detection, if not yet.
    1504             :  */
    1505             : void
    1506         138 : CreateConflictDetectionSlot(void)
    1507             : {
    1508             :     /* Exit early, if the replication slot is already created and acquired */
    1509         138 :     if (MyReplicationSlot)
    1510         130 :         return;
    1511             : 
    1512           8 :     ereport(LOG,
    1513             :             errmsg("creating replication conflict detection slot"));
    1514             : 
    1515           8 :     ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
    1516             :                           false, false);
    1517             : 
    1518           8 :     init_conflict_slot_xmin();
    1519             : }
    1520             : 
    1521             : /*
    1522             :  * Is current process the logical replication launcher?
    1523             :  */
    1524             : bool
    1525        4906 : IsLogicalLauncher(void)
    1526             : {
    1527        4906 :     return LogicalRepCtx->launcher_pid == MyProcPid;
    1528             : }
    1529             : 
    1530             : /*
    1531             :  * Return the pid of the leader apply worker if the given pid is the pid of a
    1532             :  * parallel apply worker, otherwise, return InvalidPid.
    1533             :  */
    1534             : pid_t
    1535        1950 : GetLeaderApplyWorkerPid(pid_t pid)
    1536             : {
    1537        1950 :     int         leader_pid = InvalidPid;
    1538             :     int         i;
    1539             : 
    1540        1950 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1541             : 
    1542        9750 :     for (i = 0; i < max_logical_replication_workers; i++)
    1543             :     {
    1544        7800 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
    1545             : 
    1546        7800 :         if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
    1547             :         {
    1548           0 :             leader_pid = w->leader_pid;
    1549           0 :             break;
    1550             :         }
    1551             :     }
    1552             : 
    1553        1950 :     LWLockRelease(LogicalRepWorkerLock);
    1554             : 
    1555        1950 :     return leader_pid;
    1556             : }
    1557             : 
    1558             : /*
    1559             :  * Returns state of the subscriptions.
    1560             :  */
    1561             : Datum
    1562           2 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
    1563             : {
    1564             : #define PG_STAT_GET_SUBSCRIPTION_COLS   10
    1565           2 :     Oid         subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
    1566             :     int         i;
    1567           2 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1568             : 
    1569           2 :     InitMaterializedSRF(fcinfo, 0);
    1570             : 
    1571             :     /* Make sure we get consistent view of the workers. */
    1572           2 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1573             : 
    1574          10 :     for (i = 0; i < max_logical_replication_workers; i++)
    1575             :     {
    1576             :         /* for each row */
    1577           8 :         Datum       values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
    1578           8 :         bool        nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
    1579             :         int         worker_pid;
    1580             :         LogicalRepWorker worker;
    1581             : 
    1582           8 :         memcpy(&worker, &LogicalRepCtx->workers[i],
    1583             :                sizeof(LogicalRepWorker));
    1584           8 :         if (!worker.proc || !IsBackendPid(worker.proc->pid))
    1585           4 :             continue;
    1586             : 
    1587           4 :         if (OidIsValid(subid) && worker.subid != subid)
    1588           0 :             continue;
    1589             : 
    1590           4 :         worker_pid = worker.proc->pid;
    1591             : 
    1592           4 :         values[0] = ObjectIdGetDatum(worker.subid);
    1593           4 :         if (isTablesyncWorker(&worker))
    1594           0 :             values[1] = ObjectIdGetDatum(worker.relid);
    1595             :         else
    1596           4 :             nulls[1] = true;
    1597           4 :         values[2] = Int32GetDatum(worker_pid);
    1598             : 
    1599           4 :         if (isParallelApplyWorker(&worker))
    1600           0 :             values[3] = Int32GetDatum(worker.leader_pid);
    1601             :         else
    1602           4 :             nulls[3] = true;
    1603             : 
    1604           4 :         if (XLogRecPtrIsInvalid(worker.last_lsn))
    1605           0 :             nulls[4] = true;
    1606             :         else
    1607           4 :             values[4] = LSNGetDatum(worker.last_lsn);
    1608           4 :         if (worker.last_send_time == 0)
    1609           0 :             nulls[5] = true;
    1610             :         else
    1611           4 :             values[5] = TimestampTzGetDatum(worker.last_send_time);
    1612           4 :         if (worker.last_recv_time == 0)
    1613           0 :             nulls[6] = true;
    1614             :         else
    1615           4 :             values[6] = TimestampTzGetDatum(worker.last_recv_time);
    1616           4 :         if (XLogRecPtrIsInvalid(worker.reply_lsn))
    1617           0 :             nulls[7] = true;
    1618             :         else
    1619           4 :             values[7] = LSNGetDatum(worker.reply_lsn);
    1620           4 :         if (worker.reply_time == 0)
    1621           0 :             nulls[8] = true;
    1622             :         else
    1623           4 :             values[8] = TimestampTzGetDatum(worker.reply_time);
    1624             : 
    1625           4 :         switch (worker.type)
    1626             :         {
    1627           4 :             case WORKERTYPE_APPLY:
    1628           4 :                 values[9] = CStringGetTextDatum("apply");
    1629           4 :                 break;
    1630           0 :             case WORKERTYPE_PARALLEL_APPLY:
    1631           0 :                 values[9] = CStringGetTextDatum("parallel apply");
    1632           0 :                 break;
    1633           0 :             case WORKERTYPE_TABLESYNC:
    1634           0 :                 values[9] = CStringGetTextDatum("table synchronization");
    1635           0 :                 break;
    1636           0 :             case WORKERTYPE_UNKNOWN:
    1637             :                 /* Should never happen. */
    1638           0 :                 elog(ERROR, "unknown worker type");
    1639             :         }
    1640             : 
    1641           4 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
    1642             :                              values, nulls);
    1643             : 
    1644             :         /*
    1645             :          * If only a single subscription was requested, and we found it,
    1646             :          * break.
    1647             :          */
    1648           4 :         if (OidIsValid(subid))
    1649           0 :             break;
    1650             :     }
    1651             : 
    1652           2 :     LWLockRelease(LogicalRepWorkerLock);
    1653             : 
    1654           2 :     return (Datum) 0;
    1655             : }

Generated by: LCOV version 1.16