LCOV - code coverage report
Current view: top level - src/backend/replication/logical - launcher.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 482 533 90.4 %
Date: 2025-07-29 03:18:01 Functions: 35 35 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * launcher.c
       3             :  *     PostgreSQL logical replication worker launcher process
       4             :  *
       5             :  * Copyright (c) 2016-2025, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/launcher.c
       9             :  *
      10             :  * NOTES
      11             :  *    This module contains the logical replication worker launcher which
      12             :  *    uses the background worker infrastructure to start the logical
      13             :  *    replication workers for every enabled subscription.
      14             :  *
      15             :  *-------------------------------------------------------------------------
      16             :  */
      17             : 
      18             : #include "postgres.h"
      19             : 
      20             : #include "access/heapam.h"
      21             : #include "access/htup.h"
      22             : #include "access/htup_details.h"
      23             : #include "access/tableam.h"
      24             : #include "access/xact.h"
      25             : #include "catalog/pg_subscription.h"
      26             : #include "catalog/pg_subscription_rel.h"
      27             : #include "funcapi.h"
      28             : #include "lib/dshash.h"
      29             : #include "miscadmin.h"
      30             : #include "pgstat.h"
      31             : #include "postmaster/bgworker.h"
      32             : #include "postmaster/interrupt.h"
      33             : #include "replication/logicallauncher.h"
      34             : #include "replication/origin.h"
      35             : #include "replication/slot.h"
      36             : #include "replication/walreceiver.h"
      37             : #include "replication/worker_internal.h"
      38             : #include "storage/ipc.h"
      39             : #include "storage/proc.h"
      40             : #include "storage/procarray.h"
      41             : #include "tcop/tcopprot.h"
      42             : #include "utils/builtins.h"
      43             : #include "utils/memutils.h"
      44             : #include "utils/pg_lsn.h"
      45             : #include "utils/snapmgr.h"
      46             : 
      47             : /* max sleep time between cycles (3min) */
      48             : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
      49             : 
      50             : /* GUC variables */
      51             : int         max_logical_replication_workers = 4;
      52             : int         max_sync_workers_per_subscription = 2;
      53             : int         max_parallel_apply_workers_per_subscription = 2;
      54             : 
      55             : LogicalRepWorker *MyLogicalRepWorker = NULL;
      56             : 
      57             : typedef struct LogicalRepCtxStruct
      58             : {
      59             :     /* Supervisor process. */
      60             :     pid_t       launcher_pid;
      61             : 
      62             :     /* Hash table holding last start times of subscriptions' apply workers. */
      63             :     dsa_handle  last_start_dsa;
      64             :     dshash_table_handle last_start_dsh;
      65             : 
      66             :     /* Background workers. */
      67             :     LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
      68             : } LogicalRepCtxStruct;
      69             : 
      70             : static LogicalRepCtxStruct *LogicalRepCtx;
      71             : 
      72             : /* an entry in the last-start-times shared hash table */
      73             : typedef struct LauncherLastStartTimesEntry
      74             : {
      75             :     Oid         subid;          /* OID of logrep subscription (hash key) */
      76             :     TimestampTz last_start_time;    /* last time its apply worker was started */
      77             : } LauncherLastStartTimesEntry;
      78             : 
      79             : /* parameters for the last-start-times shared hash table */
      80             : static const dshash_parameters dsh_params = {
      81             :     sizeof(Oid),
      82             :     sizeof(LauncherLastStartTimesEntry),
      83             :     dshash_memcmp,
      84             :     dshash_memhash,
      85             :     dshash_memcpy,
      86             :     LWTRANCHE_LAUNCHER_HASH
      87             : };
      88             : 
      89             : static dsa_area *last_start_times_dsa = NULL;
      90             : static dshash_table *last_start_times = NULL;
      91             : 
      92             : static bool on_commit_launcher_wakeup = false;
      93             : 
      94             : 
      95             : static void logicalrep_launcher_onexit(int code, Datum arg);
      96             : static void logicalrep_worker_onexit(int code, Datum arg);
      97             : static void logicalrep_worker_detach(void);
      98             : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
      99             : static int  logicalrep_pa_worker_count(Oid subid);
     100             : static void logicalrep_launcher_attach_dshmem(void);
     101             : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
     102             : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
     103             : static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
     104             : static bool acquire_conflict_slot_if_exists(void);
     105             : static void advance_conflict_slot_xmin(TransactionId new_xmin);
     106             : 
     107             : 
     108             : /*
     109             :  * Load the list of subscriptions.
     110             :  *
     111             :  * Only the fields interesting for worker start/stop functions are filled for
     112             :  * each subscription.
     113             :  */
     114             : static List *
     115        5318 : get_subscription_list(void)
     116             : {
     117        5318 :     List       *res = NIL;
     118             :     Relation    rel;
     119             :     TableScanDesc scan;
     120             :     HeapTuple   tup;
     121             :     MemoryContext resultcxt;
     122             : 
     123             :     /* This is the context that we will allocate our output data in */
     124        5318 :     resultcxt = CurrentMemoryContext;
     125             : 
     126             :     /*
     127             :      * Start a transaction so we can access pg_subscription.
     128             :      */
     129        5318 :     StartTransactionCommand();
     130             : 
     131        5318 :     rel = table_open(SubscriptionRelationId, AccessShareLock);
     132        5318 :     scan = table_beginscan_catalog(rel, 0, NULL);
     133             : 
     134        6702 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     135             :     {
     136        1384 :         Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
     137             :         Subscription *sub;
     138             :         MemoryContext oldcxt;
     139             : 
     140             :         /*
     141             :          * Allocate our results in the caller's context, not the
     142             :          * transaction's. We do this inside the loop, and restore the original
     143             :          * context at the end, so that leaky things like heap_getnext() are
     144             :          * not called in a potentially long-lived context.
     145             :          */
     146        1384 :         oldcxt = MemoryContextSwitchTo(resultcxt);
     147             : 
     148        1384 :         sub = (Subscription *) palloc0(sizeof(Subscription));
     149        1384 :         sub->oid = subform->oid;
     150        1384 :         sub->dbid = subform->subdbid;
     151        1384 :         sub->owner = subform->subowner;
     152        1384 :         sub->enabled = subform->subenabled;
     153        1384 :         sub->name = pstrdup(NameStr(subform->subname));
     154        1384 :         sub->retaindeadtuples = subform->subretaindeadtuples;
     155             :         /* We don't fill fields we are not interested in. */
     156             : 
     157        1384 :         res = lappend(res, sub);
     158        1384 :         MemoryContextSwitchTo(oldcxt);
     159             :     }
     160             : 
     161        5318 :     table_endscan(scan);
     162        5318 :     table_close(rel, AccessShareLock);
     163             : 
     164        5318 :     CommitTransactionCommand();
     165             : 
     166        5318 :     return res;
     167             : }
     168             : 
     169             : /*
     170             :  * Wait for a background worker to start up and attach to the shmem context.
     171             :  *
     172             :  * This is only needed for cleaning up the shared memory in case the worker
     173             :  * fails to attach.
     174             :  *
     175             :  * Returns whether the attach was successful.
     176             :  */
     177             : static bool
     178         718 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
     179             :                                uint16 generation,
     180             :                                BackgroundWorkerHandle *handle)
     181             : {
     182         718 :     bool        result = false;
     183         718 :     bool        dropped_latch = false;
     184             : 
     185             :     for (;;)
     186        3228 :     {
     187             :         BgwHandleStatus status;
     188             :         pid_t       pid;
     189             :         int         rc;
     190             : 
     191        3946 :         CHECK_FOR_INTERRUPTS();
     192             : 
     193        3946 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     194             : 
     195             :         /* Worker either died or has started. Return false if died. */
     196        3946 :         if (!worker->in_use || worker->proc)
     197             :         {
     198         718 :             result = worker->in_use;
     199         718 :             LWLockRelease(LogicalRepWorkerLock);
     200         718 :             break;
     201             :         }
     202             : 
     203        3228 :         LWLockRelease(LogicalRepWorkerLock);
     204             : 
     205             :         /* Check if worker has died before attaching, and clean up after it. */
     206        3228 :         status = GetBackgroundWorkerPid(handle, &pid);
     207             : 
     208        3228 :         if (status == BGWH_STOPPED)
     209             :         {
     210           0 :             LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     211             :             /* Ensure that this was indeed the worker we waited for. */
     212           0 :             if (generation == worker->generation)
     213           0 :                 logicalrep_worker_cleanup(worker);
     214           0 :             LWLockRelease(LogicalRepWorkerLock);
     215           0 :             break;              /* result is already false */
     216             :         }
     217             : 
     218             :         /*
     219             :          * We need timeout because we generally don't get notified via latch
     220             :          * about the worker attach.  But we don't expect to have to wait long.
     221             :          */
     222        3228 :         rc = WaitLatch(MyLatch,
     223             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     224             :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     225             : 
     226        3228 :         if (rc & WL_LATCH_SET)
     227             :         {
     228         852 :             ResetLatch(MyLatch);
     229         852 :             CHECK_FOR_INTERRUPTS();
     230         852 :             dropped_latch = true;
     231             :         }
     232             :     }
     233             : 
     234             :     /*
     235             :      * If we had to clear a latch event in order to wait, be sure to restore
     236             :      * it before exiting.  Otherwise caller may miss events.
     237             :      */
     238         718 :     if (dropped_latch)
     239         718 :         SetLatch(MyLatch);
     240             : 
     241         718 :     return result;
     242             : }
     243             : 
     244             : /*
     245             :  * Walks the workers array and searches for one that matches given
     246             :  * subscription id and relid.
     247             :  *
     248             :  * We are only interested in the leader apply worker or table sync worker.
     249             :  */
     250             : LogicalRepWorker *
     251        5780 : logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
     252             : {
     253             :     int         i;
     254        5780 :     LogicalRepWorker *res = NULL;
     255             : 
     256             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     257             : 
     258             :     /* Search for attached worker for a given subscription id. */
     259       17542 :     for (i = 0; i < max_logical_replication_workers; i++)
     260             :     {
     261       15342 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     262             : 
     263             :         /* Skip parallel apply workers. */
     264       15342 :         if (isParallelApplyWorker(w))
     265           0 :             continue;
     266             : 
     267       15342 :         if (w->in_use && w->subid == subid && w->relid == relid &&
     268        3580 :             (!only_running || w->proc))
     269             :         {
     270        3580 :             res = w;
     271        3580 :             break;
     272             :         }
     273             :     }
     274             : 
     275        5780 :     return res;
     276             : }
     277             : 
     278             : /*
     279             :  * Similar to logicalrep_worker_find(), but returns a list of all workers for
     280             :  * the subscription, instead of just one.
     281             :  */
     282             : List *
     283        1138 : logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
     284             : {
     285             :     int         i;
     286        1138 :     List       *res = NIL;
     287             : 
     288        1138 :     if (acquire_lock)
     289         228 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     290             : 
     291             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     292             : 
     293             :     /* Search for attached worker for a given subscription id. */
     294        5874 :     for (i = 0; i < max_logical_replication_workers; i++)
     295             :     {
     296        4736 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     297             : 
     298        4736 :         if (w->in_use && w->subid == subid && (!only_running || w->proc))
     299         796 :             res = lappend(res, w);
     300             :     }
     301             : 
     302        1138 :     if (acquire_lock)
     303         228 :         LWLockRelease(LogicalRepWorkerLock);
     304             : 
     305        1138 :     return res;
     306             : }
     307             : 
     308             : /*
     309             :  * Start new logical replication background worker, if possible.
     310             :  *
     311             :  * Returns true on success, false on failure.
     312             :  */
     313             : bool
     314         732 : logicalrep_worker_launch(LogicalRepWorkerType wtype,
     315             :                          Oid dbid, Oid subid, const char *subname, Oid userid,
     316             :                          Oid relid, dsm_handle subworker_dsm,
     317             :                          bool retain_dead_tuples)
     318             : {
     319             :     BackgroundWorker bgw;
     320             :     BackgroundWorkerHandle *bgw_handle;
     321             :     uint16      generation;
     322             :     int         i;
     323         732 :     int         slot = 0;
     324         732 :     LogicalRepWorker *worker = NULL;
     325             :     int         nsyncworkers;
     326             :     int         nparallelapplyworkers;
     327             :     TimestampTz now;
     328         732 :     bool        is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
     329         732 :     bool        is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
     330             : 
     331             :     /*----------
     332             :      * Sanity checks:
     333             :      * - must be valid worker type
     334             :      * - tablesync workers are only ones to have relid
     335             :      * - parallel apply worker is the only kind of subworker
     336             :      * - The replication slot used in conflict detection is created when
     337             :      *   retain_dead_tuples is enabled
     338             :      */
     339             :     Assert(wtype != WORKERTYPE_UNKNOWN);
     340             :     Assert(is_tablesync_worker == OidIsValid(relid));
     341             :     Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
     342             :     Assert(!retain_dead_tuples || MyReplicationSlot);
     343             : 
     344         732 :     ereport(DEBUG1,
     345             :             (errmsg_internal("starting logical replication worker for subscription \"%s\"",
     346             :                              subname)));
     347             : 
     348             :     /* Report this after the initial starting message for consistency. */
     349         732 :     if (max_active_replication_origins == 0)
     350           0 :         ereport(ERROR,
     351             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     352             :                  errmsg("cannot start logical replication workers when \"max_active_replication_origins\" is 0")));
     353             : 
     354             :     /*
     355             :      * We need to do the modification of the shared memory under lock so that
     356             :      * we have consistent view.
     357             :      */
     358         732 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     359             : 
     360         732 : retry:
     361             :     /* Find unused worker slot. */
     362        1332 :     for (i = 0; i < max_logical_replication_workers; i++)
     363             :     {
     364        1332 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     365             : 
     366        1332 :         if (!w->in_use)
     367             :         {
     368         732 :             worker = w;
     369         732 :             slot = i;
     370         732 :             break;
     371             :         }
     372             :     }
     373             : 
     374         732 :     nsyncworkers = logicalrep_sync_worker_count(subid);
     375             : 
     376         732 :     now = GetCurrentTimestamp();
     377             : 
     378             :     /*
     379             :      * If we didn't find a free slot, try to do garbage collection.  The
     380             :      * reason we do this is because if some worker failed to start up and its
     381             :      * parent has crashed while waiting, the in_use state was never cleared.
     382             :      */
     383         732 :     if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
     384             :     {
     385           0 :         bool        did_cleanup = false;
     386             : 
     387           0 :         for (i = 0; i < max_logical_replication_workers; i++)
     388             :         {
     389           0 :             LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     390             : 
     391             :             /*
     392             :              * If the worker was marked in use but didn't manage to attach in
     393             :              * time, clean it up.
     394             :              */
     395           0 :             if (w->in_use && !w->proc &&
     396           0 :                 TimestampDifferenceExceeds(w->launch_time, now,
     397             :                                            wal_receiver_timeout))
     398             :             {
     399           0 :                 elog(WARNING,
     400             :                      "logical replication worker for subscription %u took too long to start; canceled",
     401             :                      w->subid);
     402             : 
     403           0 :                 logicalrep_worker_cleanup(w);
     404           0 :                 did_cleanup = true;
     405             :             }
     406             :         }
     407             : 
     408           0 :         if (did_cleanup)
     409           0 :             goto retry;
     410             :     }
     411             : 
     412             :     /*
     413             :      * We don't allow to invoke more sync workers once we have reached the
     414             :      * sync worker limit per subscription. So, just return silently as we
     415             :      * might get here because of an otherwise harmless race condition.
     416             :      */
     417         732 :     if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
     418             :     {
     419           0 :         LWLockRelease(LogicalRepWorkerLock);
     420           0 :         return false;
     421             :     }
     422             : 
     423         732 :     nparallelapplyworkers = logicalrep_pa_worker_count(subid);
     424             : 
     425             :     /*
     426             :      * Return false if the number of parallel apply workers reached the limit
     427             :      * per subscription.
     428             :      */
     429         732 :     if (is_parallel_apply_worker &&
     430          20 :         nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
     431             :     {
     432           0 :         LWLockRelease(LogicalRepWorkerLock);
     433           0 :         return false;
     434             :     }
     435             : 
     436             :     /*
     437             :      * However if there are no more free worker slots, inform user about it
     438             :      * before exiting.
     439             :      */
     440         732 :     if (worker == NULL)
     441             :     {
     442           0 :         LWLockRelease(LogicalRepWorkerLock);
     443           0 :         ereport(WARNING,
     444             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     445             :                  errmsg("out of logical replication worker slots"),
     446             :                  errhint("You might need to increase \"%s\".", "max_logical_replication_workers")));
     447           0 :         return false;
     448             :     }
     449             : 
     450             :     /* Prepare the worker slot. */
     451         732 :     worker->type = wtype;
     452         732 :     worker->launch_time = now;
     453         732 :     worker->in_use = true;
     454         732 :     worker->generation++;
     455         732 :     worker->proc = NULL;
     456         732 :     worker->dbid = dbid;
     457         732 :     worker->userid = userid;
     458         732 :     worker->subid = subid;
     459         732 :     worker->relid = relid;
     460         732 :     worker->relstate = SUBREL_STATE_UNKNOWN;
     461         732 :     worker->relstate_lsn = InvalidXLogRecPtr;
     462         732 :     worker->stream_fileset = NULL;
     463         732 :     worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
     464         732 :     worker->parallel_apply = is_parallel_apply_worker;
     465         732 :     worker->oldest_nonremovable_xid = retain_dead_tuples
     466           2 :         ? MyReplicationSlot->data.xmin
     467         732 :         : InvalidTransactionId;
     468         732 :     worker->last_lsn = InvalidXLogRecPtr;
     469         732 :     TIMESTAMP_NOBEGIN(worker->last_send_time);
     470         732 :     TIMESTAMP_NOBEGIN(worker->last_recv_time);
     471         732 :     worker->reply_lsn = InvalidXLogRecPtr;
     472         732 :     TIMESTAMP_NOBEGIN(worker->reply_time);
     473             : 
     474             :     /* Before releasing lock, remember generation for future identification. */
     475         732 :     generation = worker->generation;
     476             : 
     477         732 :     LWLockRelease(LogicalRepWorkerLock);
     478             : 
     479             :     /* Register the new dynamic worker. */
     480         732 :     memset(&bgw, 0, sizeof(bgw));
     481         732 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
     482             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     483         732 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
     484         732 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
     485             : 
     486         732 :     switch (worker->type)
     487             :     {
     488         310 :         case WORKERTYPE_APPLY:
     489         310 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
     490         310 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     491             :                      "logical replication apply worker for subscription %u",
     492             :                      subid);
     493         310 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
     494         310 :             break;
     495             : 
     496          20 :         case WORKERTYPE_PARALLEL_APPLY:
     497          20 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
     498          20 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     499             :                      "logical replication parallel apply worker for subscription %u",
     500             :                      subid);
     501          20 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
     502             : 
     503          20 :             memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
     504          20 :             break;
     505             : 
     506         402 :         case WORKERTYPE_TABLESYNC:
     507         402 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
     508         402 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     509             :                      "logical replication tablesync worker for subscription %u sync %u",
     510             :                      subid,
     511             :                      relid);
     512         402 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
     513         402 :             break;
     514             : 
     515           0 :         case WORKERTYPE_UNKNOWN:
     516             :             /* Should never happen. */
     517           0 :             elog(ERROR, "unknown worker type");
     518             :     }
     519             : 
     520         732 :     bgw.bgw_restart_time = BGW_NEVER_RESTART;
     521         732 :     bgw.bgw_notify_pid = MyProcPid;
     522         732 :     bgw.bgw_main_arg = Int32GetDatum(slot);
     523             : 
     524         732 :     if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
     525             :     {
     526             :         /* Failed to start worker, so clean up the worker slot. */
     527          14 :         LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     528             :         Assert(generation == worker->generation);
     529          14 :         logicalrep_worker_cleanup(worker);
     530          14 :         LWLockRelease(LogicalRepWorkerLock);
     531             : 
     532          14 :         ereport(WARNING,
     533             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     534             :                  errmsg("out of background worker slots"),
     535             :                  errhint("You might need to increase \"%s\".", "max_worker_processes")));
     536          14 :         return false;
     537             :     }
     538             : 
     539             :     /* Now wait until it attaches. */
     540         718 :     return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
     541             : }
     542             : 
     543             : /*
     544             :  * Internal function to stop the worker and wait until it detaches from the
     545             :  * slot.
     546             :  */
     547             : static void
     548         164 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
     549             : {
     550             :     uint16      generation;
     551             : 
     552             :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
     553             : 
     554             :     /*
     555             :      * Remember which generation was our worker so we can check if what we see
     556             :      * is still the same one.
     557             :      */
     558         164 :     generation = worker->generation;
     559             : 
     560             :     /*
     561             :      * If we found a worker but it does not have proc set then it is still
     562             :      * starting up; wait for it to finish starting and then kill it.
     563             :      */
     564         168 :     while (worker->in_use && !worker->proc)
     565             :     {
     566             :         int         rc;
     567             : 
     568          10 :         LWLockRelease(LogicalRepWorkerLock);
     569             : 
     570             :         /* Wait a bit --- we don't expect to have to wait long. */
     571          10 :         rc = WaitLatch(MyLatch,
     572             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     573             :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     574             : 
     575          10 :         if (rc & WL_LATCH_SET)
     576             :         {
     577           2 :             ResetLatch(MyLatch);
     578           2 :             CHECK_FOR_INTERRUPTS();
     579             :         }
     580             : 
     581             :         /* Recheck worker status. */
     582          10 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     583             : 
     584             :         /*
     585             :          * Check whether the worker slot is no longer used, which would mean
     586             :          * that the worker has exited, or whether the worker generation is
     587             :          * different, meaning that a different worker has taken the slot.
     588             :          */
     589          10 :         if (!worker->in_use || worker->generation != generation)
     590           0 :             return;
     591             : 
     592             :         /* Worker has assigned proc, so it has started. */
     593          10 :         if (worker->proc)
     594           6 :             break;
     595             :     }
     596             : 
     597             :     /* Now terminate the worker ... */
     598         164 :     kill(worker->proc->pid, signo);
     599             : 
     600             :     /* ... and wait for it to die. */
     601             :     for (;;)
     602         214 :     {
     603             :         int         rc;
     604             : 
     605             :         /* is it gone? */
     606         378 :         if (!worker->proc || worker->generation != generation)
     607             :             break;
     608             : 
     609         214 :         LWLockRelease(LogicalRepWorkerLock);
     610             : 
     611             :         /* Wait a bit --- we don't expect to have to wait long. */
     612         214 :         rc = WaitLatch(MyLatch,
     613             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     614             :                        10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
     615             : 
     616         214 :         if (rc & WL_LATCH_SET)
     617             :         {
     618          98 :             ResetLatch(MyLatch);
     619          98 :             CHECK_FOR_INTERRUPTS();
     620             :         }
     621             : 
     622         214 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     623             :     }
     624             : }
     625             : 
     626             : /*
     627             :  * Stop the logical replication worker for subid/relid, if any.
     628             :  */
     629             : void
     630         188 : logicalrep_worker_stop(Oid subid, Oid relid)
     631             : {
     632             :     LogicalRepWorker *worker;
     633             : 
     634         188 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     635             : 
     636         188 :     worker = logicalrep_worker_find(subid, relid, false);
     637             : 
     638         188 :     if (worker)
     639             :     {
     640             :         Assert(!isParallelApplyWorker(worker));
     641         148 :         logicalrep_worker_stop_internal(worker, SIGTERM);
     642             :     }
     643             : 
     644         188 :     LWLockRelease(LogicalRepWorkerLock);
     645         188 : }
     646             : 
     647             : /*
     648             :  * Stop the given logical replication parallel apply worker.
     649             :  *
     650             :  * Node that the function sends SIGINT instead of SIGTERM to the parallel apply
     651             :  * worker so that the worker exits cleanly.
     652             :  */
     653             : void
     654          10 : logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
     655             : {
     656             :     int         slot_no;
     657             :     uint16      generation;
     658             :     LogicalRepWorker *worker;
     659             : 
     660          10 :     SpinLockAcquire(&winfo->shared->mutex);
     661          10 :     generation = winfo->shared->logicalrep_worker_generation;
     662          10 :     slot_no = winfo->shared->logicalrep_worker_slot_no;
     663          10 :     SpinLockRelease(&winfo->shared->mutex);
     664             : 
     665             :     Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
     666             : 
     667             :     /*
     668             :      * Detach from the error_mq_handle for the parallel apply worker before
     669             :      * stopping it. This prevents the leader apply worker from trying to
     670             :      * receive the message from the error queue that might already be detached
     671             :      * by the parallel apply worker.
     672             :      */
     673          10 :     if (winfo->error_mq_handle)
     674             :     {
     675          10 :         shm_mq_detach(winfo->error_mq_handle);
     676          10 :         winfo->error_mq_handle = NULL;
     677             :     }
     678             : 
     679          10 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     680             : 
     681          10 :     worker = &LogicalRepCtx->workers[slot_no];
     682             :     Assert(isParallelApplyWorker(worker));
     683             : 
     684             :     /*
     685             :      * Only stop the worker if the generation matches and the worker is alive.
     686             :      */
     687          10 :     if (worker->generation == generation && worker->proc)
     688          10 :         logicalrep_worker_stop_internal(worker, SIGINT);
     689             : 
     690          10 :     LWLockRelease(LogicalRepWorkerLock);
     691          10 : }
     692             : 
     693             : /*
     694             :  * Wake up (using latch) any logical replication worker for specified sub/rel.
     695             :  */
     696             : void
     697         420 : logicalrep_worker_wakeup(Oid subid, Oid relid)
     698             : {
     699             :     LogicalRepWorker *worker;
     700             : 
     701         420 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     702             : 
     703         420 :     worker = logicalrep_worker_find(subid, relid, true);
     704             : 
     705         420 :     if (worker)
     706         420 :         logicalrep_worker_wakeup_ptr(worker);
     707             : 
     708         420 :     LWLockRelease(LogicalRepWorkerLock);
     709         420 : }
     710             : 
     711             : /*
     712             :  * Wake up (using latch) the specified logical replication worker.
     713             :  *
     714             :  * Caller must hold lock, else worker->proc could change under us.
     715             :  */
     716             : void
     717        1270 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
     718             : {
     719             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     720             : 
     721        1270 :     SetLatch(&worker->proc->procLatch);
     722        1270 : }
     723             : 
     724             : /*
     725             :  * Attach to a slot.
     726             :  */
     727             : void
     728         926 : logicalrep_worker_attach(int slot)
     729             : {
     730             :     /* Block concurrent access. */
     731         926 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     732             : 
     733             :     Assert(slot >= 0 && slot < max_logical_replication_workers);
     734         926 :     MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
     735             : 
     736         926 :     if (!MyLogicalRepWorker->in_use)
     737             :     {
     738           0 :         LWLockRelease(LogicalRepWorkerLock);
     739           0 :         ereport(ERROR,
     740             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     741             :                  errmsg("logical replication worker slot %d is empty, cannot attach",
     742             :                         slot)));
     743             :     }
     744             : 
     745         926 :     if (MyLogicalRepWorker->proc)
     746             :     {
     747           0 :         LWLockRelease(LogicalRepWorkerLock);
     748           0 :         ereport(ERROR,
     749             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     750             :                  errmsg("logical replication worker slot %d is already used by "
     751             :                         "another worker, cannot attach", slot)));
     752             :     }
     753             : 
     754         926 :     MyLogicalRepWorker->proc = MyProc;
     755         926 :     before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
     756             : 
     757         926 :     LWLockRelease(LogicalRepWorkerLock);
     758         926 : }
     759             : 
     760             : /*
     761             :  * Stop the parallel apply workers if any, and detach the leader apply worker
     762             :  * (cleans up the worker info).
     763             :  */
     764             : static void
     765         926 : logicalrep_worker_detach(void)
     766             : {
     767             :     /* Stop the parallel apply workers. */
     768         926 :     if (am_leader_apply_worker())
     769             :     {
     770             :         List       *workers;
     771             :         ListCell   *lc;
     772             : 
     773             :         /*
     774             :          * Detach from the error_mq_handle for all parallel apply workers
     775             :          * before terminating them. This prevents the leader apply worker from
     776             :          * receiving the worker termination message and sending it to logs
     777             :          * when the same is already done by the parallel worker.
     778             :          */
     779         514 :         pa_detach_all_error_mq();
     780             : 
     781         514 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     782             : 
     783         514 :         workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
     784        1040 :         foreach(lc, workers)
     785             :         {
     786         526 :             LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
     787             : 
     788         526 :             if (isParallelApplyWorker(w))
     789           6 :                 logicalrep_worker_stop_internal(w, SIGTERM);
     790             :         }
     791             : 
     792         514 :         LWLockRelease(LogicalRepWorkerLock);
     793             :     }
     794             : 
     795             :     /* Block concurrent access. */
     796         926 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     797             : 
     798         926 :     logicalrep_worker_cleanup(MyLogicalRepWorker);
     799             : 
     800         926 :     LWLockRelease(LogicalRepWorkerLock);
     801         926 : }
     802             : 
     803             : /*
     804             :  * Clean up worker info.
     805             :  */
     806             : static void
     807         940 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
     808             : {
     809             :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
     810             : 
     811         940 :     worker->type = WORKERTYPE_UNKNOWN;
     812         940 :     worker->in_use = false;
     813         940 :     worker->proc = NULL;
     814         940 :     worker->dbid = InvalidOid;
     815         940 :     worker->userid = InvalidOid;
     816         940 :     worker->subid = InvalidOid;
     817         940 :     worker->relid = InvalidOid;
     818         940 :     worker->leader_pid = InvalidPid;
     819         940 :     worker->parallel_apply = false;
     820         940 : }
     821             : 
     822             : /*
     823             :  * Cleanup function for logical replication launcher.
     824             :  *
     825             :  * Called on logical replication launcher exit.
     826             :  */
     827             : static void
     828         818 : logicalrep_launcher_onexit(int code, Datum arg)
     829             : {
     830         818 :     LogicalRepCtx->launcher_pid = 0;
     831         818 : }
     832             : 
     833             : /*
     834             :  * Cleanup function.
     835             :  *
     836             :  * Called on logical replication worker exit.
     837             :  */
     838             : static void
     839         926 : logicalrep_worker_onexit(int code, Datum arg)
     840             : {
     841             :     /* Disconnect gracefully from the remote side. */
     842         926 :     if (LogRepWorkerWalRcvConn)
     843         816 :         walrcv_disconnect(LogRepWorkerWalRcvConn);
     844             : 
     845         926 :     logicalrep_worker_detach();
     846             : 
     847             :     /* Cleanup fileset used for streaming transactions. */
     848         926 :     if (MyLogicalRepWorker->stream_fileset != NULL)
     849          28 :         FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
     850             : 
     851             :     /*
     852             :      * Session level locks may be acquired outside of a transaction in
     853             :      * parallel apply mode and will not be released when the worker
     854             :      * terminates, so manually release all locks before the worker exits.
     855             :      *
     856             :      * The locks will be acquired once the worker is initialized.
     857             :      */
     858         926 :     if (!InitializingApplyWorker)
     859         920 :         LockReleaseAll(DEFAULT_LOCKMETHOD, true);
     860             : 
     861         926 :     ApplyLauncherWakeup();
     862         926 : }
     863             : 
     864             : /*
     865             :  * Count the number of registered (not necessarily running) sync workers
     866             :  * for a subscription.
     867             :  */
     868             : int
     869        2432 : logicalrep_sync_worker_count(Oid subid)
     870             : {
     871             :     int         i;
     872        2432 :     int         res = 0;
     873             : 
     874             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     875             : 
     876             :     /* Search for attached worker for a given subscription id. */
     877       12504 :     for (i = 0; i < max_logical_replication_workers; i++)
     878             :     {
     879       10072 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     880             : 
     881       10072 :         if (isTablesyncWorker(w) && w->subid == subid)
     882        2700 :             res++;
     883             :     }
     884             : 
     885        2432 :     return res;
     886             : }
     887             : 
     888             : /*
     889             :  * Count the number of registered (but not necessarily running) parallel apply
     890             :  * workers for a subscription.
     891             :  */
     892             : static int
     893         732 : logicalrep_pa_worker_count(Oid subid)
     894             : {
     895             :     int         i;
     896         732 :     int         res = 0;
     897             : 
     898             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     899             : 
     900             :     /*
     901             :      * Scan all attached parallel apply workers, only counting those which
     902             :      * have the given subscription id.
     903             :      */
     904        3848 :     for (i = 0; i < max_logical_replication_workers; i++)
     905             :     {
     906        3116 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     907             : 
     908        3116 :         if (isParallelApplyWorker(w) && w->subid == subid)
     909           4 :             res++;
     910             :     }
     911             : 
     912         732 :     return res;
     913             : }
     914             : 
     915             : /*
     916             :  * ApplyLauncherShmemSize
     917             :  *      Compute space needed for replication launcher shared memory
     918             :  */
     919             : Size
     920        8302 : ApplyLauncherShmemSize(void)
     921             : {
     922             :     Size        size;
     923             : 
     924             :     /*
     925             :      * Need the fixed struct and the array of LogicalRepWorker.
     926             :      */
     927        8302 :     size = sizeof(LogicalRepCtxStruct);
     928        8302 :     size = MAXALIGN(size);
     929        8302 :     size = add_size(size, mul_size(max_logical_replication_workers,
     930             :                                    sizeof(LogicalRepWorker)));
     931        8302 :     return size;
     932             : }
     933             : 
     934             : /*
     935             :  * ApplyLauncherRegister
     936             :  *      Register a background worker running the logical replication launcher.
     937             :  */
     938             : void
     939        1730 : ApplyLauncherRegister(void)
     940             : {
     941             :     BackgroundWorker bgw;
     942             : 
     943             :     /*
     944             :      * The logical replication launcher is disabled during binary upgrades, to
     945             :      * prevent logical replication workers from running on the source cluster.
     946             :      * That could cause replication origins to move forward after having been
     947             :      * copied to the target cluster, potentially creating conflicts with the
     948             :      * copied data files.
     949             :      */
     950        1730 :     if (max_logical_replication_workers == 0 || IsBinaryUpgrade)
     951         106 :         return;
     952             : 
     953        1624 :     memset(&bgw, 0, sizeof(bgw));
     954        1624 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
     955             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     956        1624 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
     957        1624 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
     958        1624 :     snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
     959        1624 :     snprintf(bgw.bgw_name, BGW_MAXLEN,
     960             :              "logical replication launcher");
     961        1624 :     snprintf(bgw.bgw_type, BGW_MAXLEN,
     962             :              "logical replication launcher");
     963        1624 :     bgw.bgw_restart_time = 5;
     964        1624 :     bgw.bgw_notify_pid = 0;
     965        1624 :     bgw.bgw_main_arg = (Datum) 0;
     966             : 
     967        1624 :     RegisterBackgroundWorker(&bgw);
     968             : }
     969             : 
     970             : /*
     971             :  * ApplyLauncherShmemInit
     972             :  *      Allocate and initialize replication launcher shared memory
     973             :  */
     974             : void
     975        2152 : ApplyLauncherShmemInit(void)
     976             : {
     977             :     bool        found;
     978             : 
     979        2152 :     LogicalRepCtx = (LogicalRepCtxStruct *)
     980        2152 :         ShmemInitStruct("Logical Replication Launcher Data",
     981             :                         ApplyLauncherShmemSize(),
     982             :                         &found);
     983             : 
     984        2152 :     if (!found)
     985             :     {
     986             :         int         slot;
     987             : 
     988        2152 :         memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
     989             : 
     990        2152 :         LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
     991        2152 :         LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
     992             : 
     993             :         /* Initialize memory and spin locks for each worker slot. */
     994       10686 :         for (slot = 0; slot < max_logical_replication_workers; slot++)
     995             :         {
     996        8534 :             LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
     997             : 
     998        8534 :             memset(worker, 0, sizeof(LogicalRepWorker));
     999        8534 :             SpinLockInit(&worker->relmutex);
    1000             :         }
    1001             :     }
    1002        2152 : }
    1003             : 
    1004             : /*
    1005             :  * Initialize or attach to the dynamic shared hash table that stores the
    1006             :  * last-start times, if not already done.
    1007             :  * This must be called before accessing the table.
    1008             :  */
    1009             : static void
    1010        1090 : logicalrep_launcher_attach_dshmem(void)
    1011             : {
    1012             :     MemoryContext oldcontext;
    1013             : 
    1014             :     /* Quick exit if we already did this. */
    1015        1090 :     if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
    1016         984 :         last_start_times != NULL)
    1017         734 :         return;
    1018             : 
    1019             :     /* Otherwise, use a lock to ensure only one process creates the table. */
    1020         356 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
    1021             : 
    1022             :     /* Be sure any local memory allocated by DSA routines is persistent. */
    1023         356 :     oldcontext = MemoryContextSwitchTo(TopMemoryContext);
    1024             : 
    1025         356 :     if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
    1026             :     {
    1027             :         /* Initialize dynamic shared hash table for last-start times. */
    1028         106 :         last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
    1029         106 :         dsa_pin(last_start_times_dsa);
    1030         106 :         dsa_pin_mapping(last_start_times_dsa);
    1031         106 :         last_start_times = dshash_create(last_start_times_dsa, &dsh_params, NULL);
    1032             : 
    1033             :         /* Store handles in shared memory for other backends to use. */
    1034         106 :         LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
    1035         106 :         LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
    1036             :     }
    1037         250 :     else if (!last_start_times)
    1038             :     {
    1039             :         /* Attach to existing dynamic shared hash table. */
    1040         250 :         last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
    1041         250 :         dsa_pin_mapping(last_start_times_dsa);
    1042         250 :         last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
    1043         250 :                                          LogicalRepCtx->last_start_dsh, NULL);
    1044             :     }
    1045             : 
    1046         356 :     MemoryContextSwitchTo(oldcontext);
    1047         356 :     LWLockRelease(LogicalRepWorkerLock);
    1048             : }
    1049             : 
    1050             : /*
    1051             :  * Set the last-start time for the subscription.
    1052             :  */
    1053             : static void
    1054         310 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
    1055             : {
    1056             :     LauncherLastStartTimesEntry *entry;
    1057             :     bool        found;
    1058             : 
    1059         310 :     logicalrep_launcher_attach_dshmem();
    1060             : 
    1061         310 :     entry = dshash_find_or_insert(last_start_times, &subid, &found);
    1062         310 :     entry->last_start_time = start_time;
    1063         310 :     dshash_release_lock(last_start_times, entry);
    1064         310 : }
    1065             : 
    1066             : /*
    1067             :  * Return the last-start time for the subscription, or 0 if there isn't one.
    1068             :  */
    1069             : static TimestampTz
    1070         460 : ApplyLauncherGetWorkerStartTime(Oid subid)
    1071             : {
    1072             :     LauncherLastStartTimesEntry *entry;
    1073             :     TimestampTz ret;
    1074             : 
    1075         460 :     logicalrep_launcher_attach_dshmem();
    1076             : 
    1077         460 :     entry = dshash_find(last_start_times, &subid, false);
    1078         460 :     if (entry == NULL)
    1079         248 :         return 0;
    1080             : 
    1081         212 :     ret = entry->last_start_time;
    1082         212 :     dshash_release_lock(last_start_times, entry);
    1083             : 
    1084         212 :     return ret;
    1085             : }
    1086             : 
    1087             : /*
    1088             :  * Remove the last-start-time entry for the subscription, if one exists.
    1089             :  *
    1090             :  * This has two use-cases: to remove the entry related to a subscription
    1091             :  * that's been deleted or disabled (just to avoid leaking shared memory),
    1092             :  * and to allow immediate restart of an apply worker that has exited
    1093             :  * due to subscription parameter changes.
    1094             :  */
    1095             : void
    1096         320 : ApplyLauncherForgetWorkerStartTime(Oid subid)
    1097             : {
    1098         320 :     logicalrep_launcher_attach_dshmem();
    1099             : 
    1100         320 :     (void) dshash_delete_key(last_start_times, &subid);
    1101         320 : }
    1102             : 
    1103             : /*
    1104             :  * Wakeup the launcher on commit if requested.
    1105             :  */
    1106             : void
    1107     1096240 : AtEOXact_ApplyLauncher(bool isCommit)
    1108             : {
    1109     1096240 :     if (isCommit)
    1110             :     {
    1111     1046376 :         if (on_commit_launcher_wakeup)
    1112         278 :             ApplyLauncherWakeup();
    1113             :     }
    1114             : 
    1115     1096240 :     on_commit_launcher_wakeup = false;
    1116     1096240 : }
    1117             : 
    1118             : /*
    1119             :  * Request wakeup of the launcher on commit of the transaction.
    1120             :  *
    1121             :  * This is used to send launcher signal to stop sleeping and process the
    1122             :  * subscriptions when current transaction commits. Should be used when new
    1123             :  * tuple was added to the pg_subscription catalog.
    1124             : */
    1125             : void
    1126         278 : ApplyLauncherWakeupAtCommit(void)
    1127             : {
    1128         278 :     if (!on_commit_launcher_wakeup)
    1129         278 :         on_commit_launcher_wakeup = true;
    1130         278 : }
    1131             : 
    1132             : /*
    1133             :  * Wakeup the launcher immediately.
    1134             :  */
    1135             : void
    1136        1232 : ApplyLauncherWakeup(void)
    1137             : {
    1138        1232 :     if (LogicalRepCtx->launcher_pid != 0)
    1139        1212 :         kill(LogicalRepCtx->launcher_pid, SIGUSR1);
    1140        1232 : }
    1141             : 
    1142             : /*
    1143             :  * Main loop for the apply launcher process.
    1144             :  */
    1145             : void
    1146         818 : ApplyLauncherMain(Datum main_arg)
    1147             : {
    1148         818 :     ereport(DEBUG1,
    1149             :             (errmsg_internal("logical replication launcher started")));
    1150             : 
    1151         818 :     before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
    1152             : 
    1153             :     Assert(LogicalRepCtx->launcher_pid == 0);
    1154         818 :     LogicalRepCtx->launcher_pid = MyProcPid;
    1155             : 
    1156             :     /* Establish signal handlers. */
    1157         818 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
    1158         818 :     pqsignal(SIGTERM, die);
    1159         818 :     BackgroundWorkerUnblockSignals();
    1160             : 
    1161             :     /*
    1162             :      * Establish connection to nailed catalogs (we only ever access
    1163             :      * pg_subscription).
    1164             :      */
    1165         818 :     BackgroundWorkerInitializeConnection(NULL, NULL, 0);
    1166             : 
    1167             :     /*
    1168             :      * Acquire the conflict detection slot at startup to ensure it can be
    1169             :      * dropped if no longer needed after a restart.
    1170             :      */
    1171         818 :     acquire_conflict_slot_if_exists();
    1172             : 
    1173             :     /* Enter main loop */
    1174             :     for (;;)
    1175        4504 :     {
    1176             :         int         rc;
    1177             :         List       *sublist;
    1178             :         ListCell   *lc;
    1179             :         MemoryContext subctx;
    1180             :         MemoryContext oldctx;
    1181        5322 :         long        wait_time = DEFAULT_NAPTIME_PER_CYCLE;
    1182        5322 :         bool        can_advance_xmin = true;
    1183        5322 :         bool        retain_dead_tuples = false;
    1184        5322 :         TransactionId xmin = InvalidTransactionId;
    1185             : 
    1186        5322 :         CHECK_FOR_INTERRUPTS();
    1187             : 
    1188             :         /* Use temporary context to avoid leaking memory across cycles. */
    1189        5318 :         subctx = AllocSetContextCreate(TopMemoryContext,
    1190             :                                        "Logical Replication Launcher sublist",
    1191             :                                        ALLOCSET_DEFAULT_SIZES);
    1192        5318 :         oldctx = MemoryContextSwitchTo(subctx);
    1193             : 
    1194             :         /*
    1195             :          * Start any missing workers for enabled subscriptions.
    1196             :          *
    1197             :          * Also, during the iteration through all subscriptions, we compute
    1198             :          * the minimum XID required to protect deleted tuples for conflict
    1199             :          * detection if one of the subscription enables retain_dead_tuples
    1200             :          * option.
    1201             :          */
    1202        5318 :         sublist = get_subscription_list();
    1203        6702 :         foreach(lc, sublist)
    1204             :         {
    1205        1384 :             Subscription *sub = (Subscription *) lfirst(lc);
    1206             :             LogicalRepWorker *w;
    1207             :             TimestampTz last_start;
    1208             :             TimestampTz now;
    1209             :             long        elapsed;
    1210             : 
    1211        1384 :             if (sub->retaindeadtuples)
    1212             :             {
    1213          12 :                 retain_dead_tuples = true;
    1214             : 
    1215             :                 /*
    1216             :                  * Can't advance xmin of the slot unless all the subscriptions
    1217             :                  * with retain_dead_tuples are enabled. This is required to
    1218             :                  * ensure that we don't advance the xmin of
    1219             :                  * CONFLICT_DETECTION_SLOT if one of the subscriptions is not
    1220             :                  * enabled. Otherwise, we won't be able to detect conflicts
    1221             :                  * reliably for such a subscription even though it has set the
    1222             :                  * retain_dead_tuples option.
    1223             :                  */
    1224          12 :                 can_advance_xmin &= sub->enabled;
    1225             : 
    1226             :                 /*
    1227             :                  * Create a replication slot to retain information necessary
    1228             :                  * for conflict detection such as dead tuples, commit
    1229             :                  * timestamps, and origins.
    1230             :                  *
    1231             :                  * The slot is created before starting the apply worker to
    1232             :                  * prevent it from unnecessarily maintaining its
    1233             :                  * oldest_nonremovable_xid.
    1234             :                  *
    1235             :                  * The slot is created even for a disabled subscription to
    1236             :                  * ensure that conflict-related information is available when
    1237             :                  * applying remote changes that occurred before the
    1238             :                  * subscription was enabled.
    1239             :                  */
    1240          12 :                 CreateConflictDetectionSlot();
    1241             :             }
    1242             : 
    1243        1384 :             if (!sub->enabled)
    1244          62 :                 continue;
    1245             : 
    1246        1322 :             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1247        1322 :             w = logicalrep_worker_find(sub->oid, InvalidOid, false);
    1248        1322 :             LWLockRelease(LogicalRepWorkerLock);
    1249             : 
    1250        1322 :             if (w != NULL)
    1251             :             {
    1252             :                 /*
    1253             :                  * Compute the minimum xmin required to protect dead tuples
    1254             :                  * required for conflict detection among all running apply
    1255             :                  * workers that enables retain_dead_tuples.
    1256             :                  */
    1257         862 :                 if (sub->retaindeadtuples && can_advance_xmin)
    1258           8 :                     compute_min_nonremovable_xid(w, &xmin);
    1259             : 
    1260             :                 /* worker is running already */
    1261         862 :                 continue;
    1262             :             }
    1263             : 
    1264             :             /*
    1265             :              * Can't advance xmin of the slot unless all the workers
    1266             :              * corresponding to subscriptions with retain_dead_tuples are
    1267             :              * running, disabling the further computation of the minimum
    1268             :              * nonremovable xid.
    1269             :              */
    1270         460 :             if (sub->retaindeadtuples)
    1271           2 :                 can_advance_xmin = false;
    1272             : 
    1273             :             /*
    1274             :              * If the worker is eligible to start now, launch it.  Otherwise,
    1275             :              * adjust wait_time so that we'll wake up as soon as it can be
    1276             :              * started.
    1277             :              *
    1278             :              * Each subscription's apply worker can only be restarted once per
    1279             :              * wal_retrieve_retry_interval, so that errors do not cause us to
    1280             :              * repeatedly restart the worker as fast as possible.  In cases
    1281             :              * where a restart is expected (e.g., subscription parameter
    1282             :              * changes), another process should remove the last-start entry
    1283             :              * for the subscription so that the worker can be restarted
    1284             :              * without waiting for wal_retrieve_retry_interval to elapse.
    1285             :              */
    1286         460 :             last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
    1287         460 :             now = GetCurrentTimestamp();
    1288         460 :             if (last_start == 0 ||
    1289         212 :                 (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
    1290             :             {
    1291         310 :                 ApplyLauncherSetWorkerStartTime(sub->oid, now);
    1292         310 :                 if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
    1293         310 :                                               sub->dbid, sub->oid, sub->name,
    1294             :                                               sub->owner, InvalidOid,
    1295             :                                               DSM_HANDLE_INVALID,
    1296         310 :                                               sub->retaindeadtuples))
    1297             :                 {
    1298             :                     /*
    1299             :                      * We get here either if we failed to launch a worker
    1300             :                      * (perhaps for resource-exhaustion reasons) or if we
    1301             :                      * launched one but it immediately quit.  Either way, it
    1302             :                      * seems appropriate to try again after
    1303             :                      * wal_retrieve_retry_interval.
    1304             :                      */
    1305           6 :                     wait_time = Min(wait_time,
    1306             :                                     wal_retrieve_retry_interval);
    1307             :                 }
    1308             :             }
    1309             :             else
    1310             :             {
    1311         150 :                 wait_time = Min(wait_time,
    1312             :                                 wal_retrieve_retry_interval - elapsed);
    1313             :             }
    1314             :         }
    1315             : 
    1316             :         /*
    1317             :          * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription
    1318             :          * that requires us to retain dead tuples. Otherwise, if required,
    1319             :          * advance the slot's xmin to protect dead tuples required for the
    1320             :          * conflict detection.
    1321             :          */
    1322        5318 :         if (MyReplicationSlot)
    1323             :         {
    1324          14 :             if (!retain_dead_tuples)
    1325           2 :                 ReplicationSlotDropAcquired();
    1326          12 :             else if (can_advance_xmin)
    1327           8 :                 advance_conflict_slot_xmin(xmin);
    1328             :         }
    1329             : 
    1330             :         /* Switch back to original memory context. */
    1331        5318 :         MemoryContextSwitchTo(oldctx);
    1332             :         /* Clean the temporary memory. */
    1333        5318 :         MemoryContextDelete(subctx);
    1334             : 
    1335             :         /* Wait for more work. */
    1336        5318 :         rc = WaitLatch(MyLatch,
    1337             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1338             :                        wait_time,
    1339             :                        WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
    1340             : 
    1341        5312 :         if (rc & WL_LATCH_SET)
    1342             :         {
    1343        5282 :             ResetLatch(MyLatch);
    1344        5282 :             CHECK_FOR_INTERRUPTS();
    1345             :         }
    1346             : 
    1347        4504 :         if (ConfigReloadPending)
    1348             :         {
    1349          72 :             ConfigReloadPending = false;
    1350          72 :             ProcessConfigFile(PGC_SIGHUP);
    1351             :         }
    1352             :     }
    1353             : 
    1354             :     /* Not reachable */
    1355             : }
    1356             : 
    1357             : /*
    1358             :  * Determine the minimum non-removable transaction ID across all apply workers
    1359             :  * for subscriptions that have retain_dead_tuples enabled. Store the result
    1360             :  * in *xmin.
    1361             :  */
    1362             : static void
    1363           8 : compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
    1364             : {
    1365             :     TransactionId nonremovable_xid;
    1366             : 
    1367             :     Assert(worker != NULL);
    1368             : 
    1369             :     /*
    1370             :      * The replication slot for conflict detection must be created before the
    1371             :      * worker starts.
    1372             :      */
    1373             :     Assert(MyReplicationSlot);
    1374             : 
    1375           8 :     SpinLockAcquire(&worker->relmutex);
    1376           8 :     nonremovable_xid = worker->oldest_nonremovable_xid;
    1377           8 :     SpinLockRelease(&worker->relmutex);
    1378             : 
    1379             :     Assert(TransactionIdIsValid(nonremovable_xid));
    1380             : 
    1381           8 :     if (!TransactionIdIsValid(*xmin) ||
    1382           0 :         TransactionIdPrecedes(nonremovable_xid, *xmin))
    1383           8 :         *xmin = nonremovable_xid;
    1384           8 : }
    1385             : 
    1386             : /*
    1387             :  * Acquire the replication slot used to retain information for conflict
    1388             :  * detection, if it exists.
    1389             :  *
    1390             :  * Return true if successfully acquired, otherwise return false.
    1391             :  */
    1392             : static bool
    1393         818 : acquire_conflict_slot_if_exists(void)
    1394             : {
    1395         818 :     if (!SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true))
    1396         818 :         return false;
    1397             : 
    1398           0 :     ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false);
    1399           0 :     return true;
    1400             : }
    1401             : 
    1402             : /*
    1403             :  * Advance the xmin the replication slot used to retain information required
    1404             :  * for conflict detection.
    1405             :  */
    1406             : static void
    1407           8 : advance_conflict_slot_xmin(TransactionId new_xmin)
    1408             : {
    1409             :     Assert(MyReplicationSlot);
    1410             :     Assert(TransactionIdIsValid(new_xmin));
    1411             :     Assert(TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
    1412             : 
    1413             :     /* Return if the xmin value of the slot cannot be advanced */
    1414           8 :     if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin))
    1415           6 :         return;
    1416             : 
    1417           2 :     SpinLockAcquire(&MyReplicationSlot->mutex);
    1418           2 :     MyReplicationSlot->effective_xmin = new_xmin;
    1419           2 :     MyReplicationSlot->data.xmin = new_xmin;
    1420           2 :     SpinLockRelease(&MyReplicationSlot->mutex);
    1421             : 
    1422           2 :     elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin);
    1423             : 
    1424           2 :     ReplicationSlotMarkDirty();
    1425           2 :     ReplicationSlotsComputeRequiredXmin(false);
    1426             : 
    1427             :     /*
    1428             :      * Like PhysicalConfirmReceivedLocation(), do not save slot information
    1429             :      * each time. This is acceptable because all concurrent transactions on
    1430             :      * the publisher that require the data preceding the slot's xmin should
    1431             :      * have already been applied and flushed on the subscriber before the xmin
    1432             :      * is advanced. So, even if the slot's xmin regresses after a restart, it
    1433             :      * will be advanced again in the next cycle. Therefore, no data required
    1434             :      * for conflict detection will be prematurely removed.
    1435             :      */
    1436           2 :     return;
    1437             : }
    1438             : 
    1439             : /*
    1440             :  * Create and acquire the replication slot used to retain information for
    1441             :  * conflict detection, if not yet.
    1442             :  */
    1443             : void
    1444          14 : CreateConflictDetectionSlot(void)
    1445             : {
    1446             :     TransactionId xmin_horizon;
    1447             : 
    1448             :     /* Exit early, if the replication slot is already created and acquired */
    1449          14 :     if (MyReplicationSlot)
    1450           8 :         return;
    1451             : 
    1452           6 :     ereport(LOG,
    1453             :             errmsg("creating replication conflict detection slot"));
    1454             : 
    1455           6 :     ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
    1456             :                           false, false);
    1457             : 
    1458           6 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
    1459             : 
    1460           6 :     xmin_horizon = GetOldestSafeDecodingTransactionId(false);
    1461             : 
    1462           6 :     SpinLockAcquire(&MyReplicationSlot->mutex);
    1463           6 :     MyReplicationSlot->effective_xmin = xmin_horizon;
    1464           6 :     MyReplicationSlot->data.xmin = xmin_horizon;
    1465           6 :     SpinLockRelease(&MyReplicationSlot->mutex);
    1466             : 
    1467           6 :     ReplicationSlotsComputeRequiredXmin(true);
    1468             : 
    1469           6 :     LWLockRelease(ProcArrayLock);
    1470             : 
    1471             :     /* Write this slot to disk */
    1472           6 :     ReplicationSlotMarkDirty();
    1473           6 :     ReplicationSlotSave();
    1474             : }
    1475             : 
    1476             : /*
    1477             :  * Is current process the logical replication launcher?
    1478             :  */
    1479             : bool
    1480        4832 : IsLogicalLauncher(void)
    1481             : {
    1482        4832 :     return LogicalRepCtx->launcher_pid == MyProcPid;
    1483             : }
    1484             : 
    1485             : /*
    1486             :  * Return the pid of the leader apply worker if the given pid is the pid of a
    1487             :  * parallel apply worker, otherwise, return InvalidPid.
    1488             :  */
    1489             : pid_t
    1490        1830 : GetLeaderApplyWorkerPid(pid_t pid)
    1491             : {
    1492        1830 :     int         leader_pid = InvalidPid;
    1493             :     int         i;
    1494             : 
    1495        1830 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1496             : 
    1497        9150 :     for (i = 0; i < max_logical_replication_workers; i++)
    1498             :     {
    1499        7320 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
    1500             : 
    1501        7320 :         if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
    1502             :         {
    1503           0 :             leader_pid = w->leader_pid;
    1504           0 :             break;
    1505             :         }
    1506             :     }
    1507             : 
    1508        1830 :     LWLockRelease(LogicalRepWorkerLock);
    1509             : 
    1510        1830 :     return leader_pid;
    1511             : }
    1512             : 
    1513             : /*
    1514             :  * Returns state of the subscriptions.
    1515             :  */
    1516             : Datum
    1517           2 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
    1518             : {
    1519             : #define PG_STAT_GET_SUBSCRIPTION_COLS   10
    1520           2 :     Oid         subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
    1521             :     int         i;
    1522           2 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1523             : 
    1524           2 :     InitMaterializedSRF(fcinfo, 0);
    1525             : 
    1526             :     /* Make sure we get consistent view of the workers. */
    1527           2 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1528             : 
    1529          10 :     for (i = 0; i < max_logical_replication_workers; i++)
    1530             :     {
    1531             :         /* for each row */
    1532           8 :         Datum       values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
    1533           8 :         bool        nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
    1534             :         int         worker_pid;
    1535             :         LogicalRepWorker worker;
    1536             : 
    1537           8 :         memcpy(&worker, &LogicalRepCtx->workers[i],
    1538             :                sizeof(LogicalRepWorker));
    1539           8 :         if (!worker.proc || !IsBackendPid(worker.proc->pid))
    1540           4 :             continue;
    1541             : 
    1542           4 :         if (OidIsValid(subid) && worker.subid != subid)
    1543           0 :             continue;
    1544             : 
    1545           4 :         worker_pid = worker.proc->pid;
    1546             : 
    1547           4 :         values[0] = ObjectIdGetDatum(worker.subid);
    1548           4 :         if (isTablesyncWorker(&worker))
    1549           0 :             values[1] = ObjectIdGetDatum(worker.relid);
    1550             :         else
    1551           4 :             nulls[1] = true;
    1552           4 :         values[2] = Int32GetDatum(worker_pid);
    1553             : 
    1554           4 :         if (isParallelApplyWorker(&worker))
    1555           0 :             values[3] = Int32GetDatum(worker.leader_pid);
    1556             :         else
    1557           4 :             nulls[3] = true;
    1558             : 
    1559           4 :         if (XLogRecPtrIsInvalid(worker.last_lsn))
    1560           2 :             nulls[4] = true;
    1561             :         else
    1562           2 :             values[4] = LSNGetDatum(worker.last_lsn);
    1563           4 :         if (worker.last_send_time == 0)
    1564           0 :             nulls[5] = true;
    1565             :         else
    1566           4 :             values[5] = TimestampTzGetDatum(worker.last_send_time);
    1567           4 :         if (worker.last_recv_time == 0)
    1568           0 :             nulls[6] = true;
    1569             :         else
    1570           4 :             values[6] = TimestampTzGetDatum(worker.last_recv_time);
    1571           4 :         if (XLogRecPtrIsInvalid(worker.reply_lsn))
    1572           2 :             nulls[7] = true;
    1573             :         else
    1574           2 :             values[7] = LSNGetDatum(worker.reply_lsn);
    1575           4 :         if (worker.reply_time == 0)
    1576           0 :             nulls[8] = true;
    1577             :         else
    1578           4 :             values[8] = TimestampTzGetDatum(worker.reply_time);
    1579             : 
    1580           4 :         switch (worker.type)
    1581             :         {
    1582           4 :             case WORKERTYPE_APPLY:
    1583           4 :                 values[9] = CStringGetTextDatum("apply");
    1584           4 :                 break;
    1585           0 :             case WORKERTYPE_PARALLEL_APPLY:
    1586           0 :                 values[9] = CStringGetTextDatum("parallel apply");
    1587           0 :                 break;
    1588           0 :             case WORKERTYPE_TABLESYNC:
    1589           0 :                 values[9] = CStringGetTextDatum("table synchronization");
    1590           0 :                 break;
    1591           0 :             case WORKERTYPE_UNKNOWN:
    1592             :                 /* Should never happen. */
    1593           0 :                 elog(ERROR, "unknown worker type");
    1594             :         }
    1595             : 
    1596           4 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
    1597             :                              values, nulls);
    1598             : 
    1599             :         /*
    1600             :          * If only a single subscription was requested, and we found it,
    1601             :          * break.
    1602             :          */
    1603           4 :         if (OidIsValid(subid))
    1604           0 :             break;
    1605             :     }
    1606             : 
    1607           2 :     LWLockRelease(LogicalRepWorkerLock);
    1608             : 
    1609           2 :     return (Datum) 0;
    1610             : }

Generated by: LCOV version 1.16