LCOV - code coverage report
Current view: top level - src/backend/replication/logical - launcher.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 409 464 88.1 %
Date: 2025-01-18 04:15:08 Functions: 31 31 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * launcher.c
       3             :  *     PostgreSQL logical replication worker launcher process
       4             :  *
       5             :  * Copyright (c) 2016-2025, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/launcher.c
       9             :  *
      10             :  * NOTES
      11             :  *    This module contains the logical replication worker launcher which
      12             :  *    uses the background worker infrastructure to start the logical
      13             :  *    replication workers for every enabled subscription.
      14             :  *
      15             :  *-------------------------------------------------------------------------
      16             :  */
      17             : 
      18             : #include "postgres.h"
      19             : 
      20             : #include "access/heapam.h"
      21             : #include "access/htup.h"
      22             : #include "access/htup_details.h"
      23             : #include "access/tableam.h"
      24             : #include "access/xact.h"
      25             : #include "catalog/pg_subscription.h"
      26             : #include "catalog/pg_subscription_rel.h"
      27             : #include "funcapi.h"
      28             : #include "lib/dshash.h"
      29             : #include "miscadmin.h"
      30             : #include "pgstat.h"
      31             : #include "postmaster/bgworker.h"
      32             : #include "postmaster/interrupt.h"
      33             : #include "replication/logicallauncher.h"
      34             : #include "replication/slot.h"
      35             : #include "replication/walreceiver.h"
      36             : #include "replication/worker_internal.h"
      37             : #include "storage/ipc.h"
      38             : #include "storage/proc.h"
      39             : #include "storage/procarray.h"
      40             : #include "tcop/tcopprot.h"
      41             : #include "utils/builtins.h"
      42             : #include "utils/memutils.h"
      43             : #include "utils/pg_lsn.h"
      44             : #include "utils/snapmgr.h"
      45             : 
      46             : /* max sleep time between cycles (3min) */
      47             : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
      48             : 
      49             : /* GUC variables */
      50             : int         max_logical_replication_workers = 4;
      51             : int         max_sync_workers_per_subscription = 2;
      52             : int         max_parallel_apply_workers_per_subscription = 2;
      53             : 
      54             : LogicalRepWorker *MyLogicalRepWorker = NULL;
      55             : 
      56             : typedef struct LogicalRepCtxStruct
      57             : {
      58             :     /* Supervisor process. */
      59             :     pid_t       launcher_pid;
      60             : 
      61             :     /* Hash table holding last start times of subscriptions' apply workers. */
      62             :     dsa_handle  last_start_dsa;
      63             :     dshash_table_handle last_start_dsh;
      64             : 
      65             :     /* Background workers. */
      66             :     LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
      67             : } LogicalRepCtxStruct;
      68             : 
      69             : static LogicalRepCtxStruct *LogicalRepCtx;
      70             : 
      71             : /* an entry in the last-start-times shared hash table */
      72             : typedef struct LauncherLastStartTimesEntry
      73             : {
      74             :     Oid         subid;          /* OID of logrep subscription (hash key) */
      75             :     TimestampTz last_start_time;    /* last time its apply worker was started */
      76             : } LauncherLastStartTimesEntry;
      77             : 
      78             : /* parameters for the last-start-times shared hash table */
      79             : static const dshash_parameters dsh_params = {
      80             :     sizeof(Oid),
      81             :     sizeof(LauncherLastStartTimesEntry),
      82             :     dshash_memcmp,
      83             :     dshash_memhash,
      84             :     dshash_memcpy,
      85             :     LWTRANCHE_LAUNCHER_HASH
      86             : };
      87             : 
      88             : static dsa_area *last_start_times_dsa = NULL;
      89             : static dshash_table *last_start_times = NULL;
      90             : 
      91             : static bool on_commit_launcher_wakeup = false;
      92             : 
      93             : 
      94             : static void ApplyLauncherWakeup(void);
      95             : static void logicalrep_launcher_onexit(int code, Datum arg);
      96             : static void logicalrep_worker_onexit(int code, Datum arg);
      97             : static void logicalrep_worker_detach(void);
      98             : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
      99             : static int  logicalrep_pa_worker_count(Oid subid);
     100             : static void logicalrep_launcher_attach_dshmem(void);
     101             : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
     102             : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
     103             : 
     104             : 
     105             : /*
     106             :  * Load the list of subscriptions.
     107             :  *
     108             :  * Only the fields interesting for worker start/stop functions are filled for
     109             :  * each subscription.
     110             :  */
     111             : static List *
     112        4474 : get_subscription_list(void)
     113             : {
     114        4474 :     List       *res = NIL;
     115             :     Relation    rel;
     116             :     TableScanDesc scan;
     117             :     HeapTuple   tup;
     118             :     MemoryContext resultcxt;
     119             : 
     120             :     /* This is the context that we will allocate our output data in */
     121        4474 :     resultcxt = CurrentMemoryContext;
     122             : 
     123             :     /*
     124             :      * Start a transaction so we can access pg_subscription.
     125             :      */
     126        4474 :     StartTransactionCommand();
     127             : 
     128        4474 :     rel = table_open(SubscriptionRelationId, AccessShareLock);
     129        4474 :     scan = table_beginscan_catalog(rel, 0, NULL);
     130             : 
     131        5264 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     132             :     {
     133         790 :         Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
     134             :         Subscription *sub;
     135             :         MemoryContext oldcxt;
     136             : 
     137             :         /*
     138             :          * Allocate our results in the caller's context, not the
     139             :          * transaction's. We do this inside the loop, and restore the original
     140             :          * context at the end, so that leaky things like heap_getnext() are
     141             :          * not called in a potentially long-lived context.
     142             :          */
     143         790 :         oldcxt = MemoryContextSwitchTo(resultcxt);
     144             : 
     145         790 :         sub = (Subscription *) palloc0(sizeof(Subscription));
     146         790 :         sub->oid = subform->oid;
     147         790 :         sub->dbid = subform->subdbid;
     148         790 :         sub->owner = subform->subowner;
     149         790 :         sub->enabled = subform->subenabled;
     150         790 :         sub->name = pstrdup(NameStr(subform->subname));
     151             :         /* We don't fill fields we are not interested in. */
     152             : 
     153         790 :         res = lappend(res, sub);
     154         790 :         MemoryContextSwitchTo(oldcxt);
     155             :     }
     156             : 
     157        4474 :     table_endscan(scan);
     158        4474 :     table_close(rel, AccessShareLock);
     159             : 
     160        4474 :     CommitTransactionCommand();
     161             : 
     162        4474 :     return res;
     163             : }
     164             : 
     165             : /*
     166             :  * Wait for a background worker to start up and attach to the shmem context.
     167             :  *
     168             :  * This is only needed for cleaning up the shared memory in case the worker
     169             :  * fails to attach.
     170             :  *
     171             :  * Returns whether the attach was successful.
     172             :  */
     173             : static bool
     174        3382 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
     175             :                                uint16 generation,
     176             :                                BackgroundWorkerHandle *handle)
     177             : {
     178             :     BgwHandleStatus status;
     179             :     int         rc;
     180             : 
     181             :     for (;;)
     182        2728 :     {
     183             :         pid_t       pid;
     184             : 
     185        3382 :         CHECK_FOR_INTERRUPTS();
     186             : 
     187        3382 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     188             : 
     189             :         /* Worker either died or has started. Return false if died. */
     190        3382 :         if (!worker->in_use || worker->proc)
     191             :         {
     192         652 :             LWLockRelease(LogicalRepWorkerLock);
     193         652 :             return worker->in_use;
     194             :         }
     195             : 
     196        2730 :         LWLockRelease(LogicalRepWorkerLock);
     197             : 
     198             :         /* Check if worker has died before attaching, and clean up after it. */
     199        2730 :         status = GetBackgroundWorkerPid(handle, &pid);
     200             : 
     201        2730 :         if (status == BGWH_STOPPED)
     202             :         {
     203           0 :             LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     204             :             /* Ensure that this was indeed the worker we waited for. */
     205           0 :             if (generation == worker->generation)
     206           0 :                 logicalrep_worker_cleanup(worker);
     207           0 :             LWLockRelease(LogicalRepWorkerLock);
     208           0 :             return false;
     209             :         }
     210             : 
     211             :         /*
     212             :          * We need timeout because we generally don't get notified via latch
     213             :          * about the worker attach.  But we don't expect to have to wait long.
     214             :          */
     215        2730 :         rc = WaitLatch(MyLatch,
     216             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     217             :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     218             : 
     219        2730 :         if (rc & WL_LATCH_SET)
     220             :         {
     221         838 :             ResetLatch(MyLatch);
     222         838 :             CHECK_FOR_INTERRUPTS();
     223             :         }
     224             :     }
     225             : }
     226             : 
     227             : /*
     228             :  * Walks the workers array and searches for one that matches given
     229             :  * subscription id and relid.
     230             :  *
     231             :  * We are only interested in the leader apply worker or table sync worker.
     232             :  */
     233             : LogicalRepWorker *
     234        4452 : logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
     235             : {
     236             :     int         i;
     237        4452 :     LogicalRepWorker *res = NULL;
     238             : 
     239             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     240             : 
     241             :     /* Search for attached worker for a given subscription id. */
     242       13576 :     for (i = 0; i < max_logical_replication_workers; i++)
     243             :     {
     244       11892 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     245             : 
     246             :         /* Skip parallel apply workers. */
     247       11892 :         if (isParallelApplyWorker(w))
     248           0 :             continue;
     249             : 
     250       11892 :         if (w->in_use && w->subid == subid && w->relid == relid &&
     251        2768 :             (!only_running || w->proc))
     252             :         {
     253        2768 :             res = w;
     254        2768 :             break;
     255             :         }
     256             :     }
     257             : 
     258        4452 :     return res;
     259             : }
     260             : 
     261             : /*
     262             :  * Similar to logicalrep_worker_find(), but returns a list of all workers for
     263             :  * the subscription, instead of just one.
     264             :  */
     265             : List *
     266         978 : logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
     267             : {
     268             :     int         i;
     269         978 :     List       *res = NIL;
     270             : 
     271         978 :     if (acquire_lock)
     272         200 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     273             : 
     274             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     275             : 
     276             :     /* Search for attached worker for a given subscription id. */
     277        5086 :     for (i = 0; i < max_logical_replication_workers; i++)
     278             :     {
     279        4108 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     280             : 
     281        4108 :         if (w->in_use && w->subid == subid && (!only_running || w->proc))
     282         654 :             res = lappend(res, w);
     283             :     }
     284             : 
     285         978 :     if (acquire_lock)
     286         200 :         LWLockRelease(LogicalRepWorkerLock);
     287             : 
     288         978 :     return res;
     289             : }
     290             : 
     291             : /*
     292             :  * Start new logical replication background worker, if possible.
     293             :  *
     294             :  * Returns true on success, false on failure.
     295             :  */
     296             : bool
     297         654 : logicalrep_worker_launch(LogicalRepWorkerType wtype,
     298             :                          Oid dbid, Oid subid, const char *subname, Oid userid,
     299             :                          Oid relid, dsm_handle subworker_dsm)
     300             : {
     301             :     BackgroundWorker bgw;
     302             :     BackgroundWorkerHandle *bgw_handle;
     303             :     uint16      generation;
     304             :     int         i;
     305         654 :     int         slot = 0;
     306         654 :     LogicalRepWorker *worker = NULL;
     307             :     int         nsyncworkers;
     308             :     int         nparallelapplyworkers;
     309             :     TimestampTz now;
     310         654 :     bool        is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
     311         654 :     bool        is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
     312             : 
     313             :     /*----------
     314             :      * Sanity checks:
     315             :      * - must be valid worker type
     316             :      * - tablesync workers are only ones to have relid
     317             :      * - parallel apply worker is the only kind of subworker
     318             :      */
     319             :     Assert(wtype != WORKERTYPE_UNKNOWN);
     320             :     Assert(is_tablesync_worker == OidIsValid(relid));
     321             :     Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
     322             : 
     323         654 :     ereport(DEBUG1,
     324             :             (errmsg_internal("starting logical replication worker for subscription \"%s\"",
     325             :                              subname)));
     326             : 
     327             :     /* Report this after the initial starting message for consistency. */
     328         654 :     if (max_replication_slots == 0)
     329           0 :         ereport(ERROR,
     330             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     331             :                  errmsg("cannot start logical replication workers when \"max_replication_slots\"=0")));
     332             : 
     333             :     /*
     334             :      * We need to do the modification of the shared memory under lock so that
     335             :      * we have consistent view.
     336             :      */
     337         654 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     338             : 
     339         654 : retry:
     340             :     /* Find unused worker slot. */
     341        1184 :     for (i = 0; i < max_logical_replication_workers; i++)
     342             :     {
     343        1184 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     344             : 
     345        1184 :         if (!w->in_use)
     346             :         {
     347         654 :             worker = w;
     348         654 :             slot = i;
     349         654 :             break;
     350             :         }
     351             :     }
     352             : 
     353         654 :     nsyncworkers = logicalrep_sync_worker_count(subid);
     354             : 
     355         654 :     now = GetCurrentTimestamp();
     356             : 
     357             :     /*
     358             :      * If we didn't find a free slot, try to do garbage collection.  The
     359             :      * reason we do this is because if some worker failed to start up and its
     360             :      * parent has crashed while waiting, the in_use state was never cleared.
     361             :      */
     362         654 :     if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
     363             :     {
     364           0 :         bool        did_cleanup = false;
     365             : 
     366           0 :         for (i = 0; i < max_logical_replication_workers; i++)
     367             :         {
     368           0 :             LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     369             : 
     370             :             /*
     371             :              * If the worker was marked in use but didn't manage to attach in
     372             :              * time, clean it up.
     373             :              */
     374           0 :             if (w->in_use && !w->proc &&
     375           0 :                 TimestampDifferenceExceeds(w->launch_time, now,
     376             :                                            wal_receiver_timeout))
     377             :             {
     378           0 :                 elog(WARNING,
     379             :                      "logical replication worker for subscription %u took too long to start; canceled",
     380             :                      w->subid);
     381             : 
     382           0 :                 logicalrep_worker_cleanup(w);
     383           0 :                 did_cleanup = true;
     384             :             }
     385             :         }
     386             : 
     387           0 :         if (did_cleanup)
     388           0 :             goto retry;
     389             :     }
     390             : 
     391             :     /*
     392             :      * We don't allow to invoke more sync workers once we have reached the
     393             :      * sync worker limit per subscription. So, just return silently as we
     394             :      * might get here because of an otherwise harmless race condition.
     395             :      */
     396         654 :     if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
     397             :     {
     398           0 :         LWLockRelease(LogicalRepWorkerLock);
     399           0 :         return false;
     400             :     }
     401             : 
     402         654 :     nparallelapplyworkers = logicalrep_pa_worker_count(subid);
     403             : 
     404             :     /*
     405             :      * Return false if the number of parallel apply workers reached the limit
     406             :      * per subscription.
     407             :      */
     408         654 :     if (is_parallel_apply_worker &&
     409          20 :         nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
     410             :     {
     411           0 :         LWLockRelease(LogicalRepWorkerLock);
     412           0 :         return false;
     413             :     }
     414             : 
     415             :     /*
     416             :      * However if there are no more free worker slots, inform user about it
     417             :      * before exiting.
     418             :      */
     419         654 :     if (worker == NULL)
     420             :     {
     421           0 :         LWLockRelease(LogicalRepWorkerLock);
     422           0 :         ereport(WARNING,
     423             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     424             :                  errmsg("out of logical replication worker slots"),
     425             :                  errhint("You might need to increase \"%s\".", "max_logical_replication_workers")));
     426           0 :         return false;
     427             :     }
     428             : 
     429             :     /* Prepare the worker slot. */
     430         654 :     worker->type = wtype;
     431         654 :     worker->launch_time = now;
     432         654 :     worker->in_use = true;
     433         654 :     worker->generation++;
     434         654 :     worker->proc = NULL;
     435         654 :     worker->dbid = dbid;
     436         654 :     worker->userid = userid;
     437         654 :     worker->subid = subid;
     438         654 :     worker->relid = relid;
     439         654 :     worker->relstate = SUBREL_STATE_UNKNOWN;
     440         654 :     worker->relstate_lsn = InvalidXLogRecPtr;
     441         654 :     worker->stream_fileset = NULL;
     442         654 :     worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
     443         654 :     worker->parallel_apply = is_parallel_apply_worker;
     444         654 :     worker->last_lsn = InvalidXLogRecPtr;
     445         654 :     TIMESTAMP_NOBEGIN(worker->last_send_time);
     446         654 :     TIMESTAMP_NOBEGIN(worker->last_recv_time);
     447         654 :     worker->reply_lsn = InvalidXLogRecPtr;
     448         654 :     TIMESTAMP_NOBEGIN(worker->reply_time);
     449             : 
     450             :     /* Before releasing lock, remember generation for future identification. */
     451         654 :     generation = worker->generation;
     452             : 
     453         654 :     LWLockRelease(LogicalRepWorkerLock);
     454             : 
     455             :     /* Register the new dynamic worker. */
     456         654 :     memset(&bgw, 0, sizeof(bgw));
     457         654 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
     458             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     459         654 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
     460         654 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
     461             : 
     462         654 :     switch (worker->type)
     463             :     {
     464         268 :         case WORKERTYPE_APPLY:
     465         268 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
     466         268 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     467             :                      "logical replication apply worker for subscription %u",
     468             :                      subid);
     469         268 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
     470         268 :             break;
     471             : 
     472          20 :         case WORKERTYPE_PARALLEL_APPLY:
     473          20 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
     474          20 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     475             :                      "logical replication parallel apply worker for subscription %u",
     476             :                      subid);
     477          20 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
     478             : 
     479          20 :             memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
     480          20 :             break;
     481             : 
     482         366 :         case WORKERTYPE_TABLESYNC:
     483         366 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
     484         366 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     485             :                      "logical replication tablesync worker for subscription %u sync %u",
     486             :                      subid,
     487             :                      relid);
     488         366 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
     489         366 :             break;
     490             : 
     491           0 :         case WORKERTYPE_UNKNOWN:
     492             :             /* Should never happen. */
     493           0 :             elog(ERROR, "unknown worker type");
     494             :     }
     495             : 
     496         654 :     bgw.bgw_restart_time = BGW_NEVER_RESTART;
     497         654 :     bgw.bgw_notify_pid = MyProcPid;
     498         654 :     bgw.bgw_main_arg = Int32GetDatum(slot);
     499             : 
     500         654 :     if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
     501             :     {
     502             :         /* Failed to start worker, so clean up the worker slot. */
     503           0 :         LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     504             :         Assert(generation == worker->generation);
     505           0 :         logicalrep_worker_cleanup(worker);
     506           0 :         LWLockRelease(LogicalRepWorkerLock);
     507             : 
     508           0 :         ereport(WARNING,
     509             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     510             :                  errmsg("out of background worker slots"),
     511             :                  errhint("You might need to increase \"%s\".", "max_worker_processes")));
     512           0 :         return false;
     513             :     }
     514             : 
     515             :     /* Now wait until it attaches. */
     516         654 :     return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
     517             : }
     518             : 
     519             : /*
     520             :  * Internal function to stop the worker and wait until it detaches from the
     521             :  * slot.
     522             :  */
     523             : static void
     524         144 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
     525             : {
     526             :     uint16      generation;
     527             : 
     528             :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
     529             : 
     530             :     /*
     531             :      * Remember which generation was our worker so we can check if what we see
     532             :      * is still the same one.
     533             :      */
     534         144 :     generation = worker->generation;
     535             : 
     536             :     /*
     537             :      * If we found a worker but it does not have proc set then it is still
     538             :      * starting up; wait for it to finish starting and then kill it.
     539             :      */
     540         158 :     while (worker->in_use && !worker->proc)
     541             :     {
     542             :         int         rc;
     543             : 
     544          20 :         LWLockRelease(LogicalRepWorkerLock);
     545             : 
     546             :         /* Wait a bit --- we don't expect to have to wait long. */
     547          20 :         rc = WaitLatch(MyLatch,
     548             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     549             :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     550             : 
     551          20 :         if (rc & WL_LATCH_SET)
     552             :         {
     553           0 :             ResetLatch(MyLatch);
     554           0 :             CHECK_FOR_INTERRUPTS();
     555             :         }
     556             : 
     557             :         /* Recheck worker status. */
     558          20 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     559             : 
     560             :         /*
     561             :          * Check whether the worker slot is no longer used, which would mean
     562             :          * that the worker has exited, or whether the worker generation is
     563             :          * different, meaning that a different worker has taken the slot.
     564             :          */
     565          20 :         if (!worker->in_use || worker->generation != generation)
     566           0 :             return;
     567             : 
     568             :         /* Worker has assigned proc, so it has started. */
     569          20 :         if (worker->proc)
     570           6 :             break;
     571             :     }
     572             : 
     573             :     /* Now terminate the worker ... */
     574         144 :     kill(worker->proc->pid, signo);
     575             : 
     576             :     /* ... and wait for it to die. */
     577             :     for (;;)
     578         180 :     {
     579             :         int         rc;
     580             : 
     581             :         /* is it gone? */
     582         324 :         if (!worker->proc || worker->generation != generation)
     583             :             break;
     584             : 
     585         180 :         LWLockRelease(LogicalRepWorkerLock);
     586             : 
     587             :         /* Wait a bit --- we don't expect to have to wait long. */
     588         180 :         rc = WaitLatch(MyLatch,
     589             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     590             :                        10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
     591             : 
     592         180 :         if (rc & WL_LATCH_SET)
     593             :         {
     594          34 :             ResetLatch(MyLatch);
     595          34 :             CHECK_FOR_INTERRUPTS();
     596             :         }
     597             : 
     598         180 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     599             :     }
     600             : }
     601             : 
     602             : /*
     603             :  * Stop the logical replication worker for subid/relid, if any.
     604             :  */
     605             : void
     606         164 : logicalrep_worker_stop(Oid subid, Oid relid)
     607             : {
     608             :     LogicalRepWorker *worker;
     609             : 
     610         164 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     611             : 
     612         164 :     worker = logicalrep_worker_find(subid, relid, false);
     613             : 
     614         164 :     if (worker)
     615             :     {
     616             :         Assert(!isParallelApplyWorker(worker));
     617         128 :         logicalrep_worker_stop_internal(worker, SIGTERM);
     618             :     }
     619             : 
     620         164 :     LWLockRelease(LogicalRepWorkerLock);
     621         164 : }
     622             : 
     623             : /*
     624             :  * Stop the given logical replication parallel apply worker.
     625             :  *
     626             :  * Node that the function sends SIGINT instead of SIGTERM to the parallel apply
     627             :  * worker so that the worker exits cleanly.
     628             :  */
     629             : void
     630          10 : logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
     631             : {
     632             :     int         slot_no;
     633             :     uint16      generation;
     634             :     LogicalRepWorker *worker;
     635             : 
     636          10 :     SpinLockAcquire(&winfo->shared->mutex);
     637          10 :     generation = winfo->shared->logicalrep_worker_generation;
     638          10 :     slot_no = winfo->shared->logicalrep_worker_slot_no;
     639          10 :     SpinLockRelease(&winfo->shared->mutex);
     640             : 
     641             :     Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
     642             : 
     643             :     /*
     644             :      * Detach from the error_mq_handle for the parallel apply worker before
     645             :      * stopping it. This prevents the leader apply worker from trying to
     646             :      * receive the message from the error queue that might already be detached
     647             :      * by the parallel apply worker.
     648             :      */
     649          10 :     if (winfo->error_mq_handle)
     650             :     {
     651          10 :         shm_mq_detach(winfo->error_mq_handle);
     652          10 :         winfo->error_mq_handle = NULL;
     653             :     }
     654             : 
     655          10 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     656             : 
     657          10 :     worker = &LogicalRepCtx->workers[slot_no];
     658             :     Assert(isParallelApplyWorker(worker));
     659             : 
     660             :     /*
     661             :      * Only stop the worker if the generation matches and the worker is alive.
     662             :      */
     663          10 :     if (worker->generation == generation && worker->proc)
     664          10 :         logicalrep_worker_stop_internal(worker, SIGINT);
     665             : 
     666          10 :     LWLockRelease(LogicalRepWorkerLock);
     667          10 : }
     668             : 
     669             : /*
     670             :  * Wake up (using latch) any logical replication worker for specified sub/rel.
     671             :  */
     672             : void
     673         402 : logicalrep_worker_wakeup(Oid subid, Oid relid)
     674             : {
     675             :     LogicalRepWorker *worker;
     676             : 
     677         402 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     678             : 
     679         402 :     worker = logicalrep_worker_find(subid, relid, true);
     680             : 
     681         402 :     if (worker)
     682         402 :         logicalrep_worker_wakeup_ptr(worker);
     683             : 
     684         402 :     LWLockRelease(LogicalRepWorkerLock);
     685         402 : }
     686             : 
     687             : /*
     688             :  * Wake up (using latch) the specified logical replication worker.
     689             :  *
     690             :  * Caller must hold lock, else worker->proc could change under us.
     691             :  */
     692             : void
     693        1196 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
     694             : {
     695             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     696             : 
     697        1196 :     SetLatch(&worker->proc->procLatch);
     698        1196 : }
     699             : 
     700             : /*
     701             :  * Attach to a slot.
     702             :  */
     703             : void
     704         806 : logicalrep_worker_attach(int slot)
     705             : {
     706             :     /* Block concurrent access. */
     707         806 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     708             : 
     709             :     Assert(slot >= 0 && slot < max_logical_replication_workers);
     710         806 :     MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
     711             : 
     712         806 :     if (!MyLogicalRepWorker->in_use)
     713             :     {
     714           0 :         LWLockRelease(LogicalRepWorkerLock);
     715           0 :         ereport(ERROR,
     716             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     717             :                  errmsg("logical replication worker slot %d is empty, cannot attach",
     718             :                         slot)));
     719             :     }
     720             : 
     721         806 :     if (MyLogicalRepWorker->proc)
     722             :     {
     723           0 :         LWLockRelease(LogicalRepWorkerLock);
     724           0 :         ereport(ERROR,
     725             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     726             :                  errmsg("logical replication worker slot %d is already used by "
     727             :                         "another worker, cannot attach", slot)));
     728             :     }
     729             : 
     730         806 :     MyLogicalRepWorker->proc = MyProc;
     731         806 :     before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
     732             : 
     733         806 :     LWLockRelease(LogicalRepWorkerLock);
     734         806 : }
     735             : 
     736             : /*
     737             :  * Stop the parallel apply workers if any, and detach the leader apply worker
     738             :  * (cleans up the worker info).
     739             :  */
     740             : static void
     741         806 : logicalrep_worker_detach(void)
     742             : {
     743             :     /* Stop the parallel apply workers. */
     744         806 :     if (am_leader_apply_worker())
     745             :     {
     746             :         List       *workers;
     747             :         ListCell   *lc;
     748             : 
     749             :         /*
     750             :          * Detach from the error_mq_handle for all parallel apply workers
     751             :          * before terminating them. This prevents the leader apply worker from
     752             :          * receiving the worker termination message and sending it to logs
     753             :          * when the same is already done by the parallel worker.
     754             :          */
     755         416 :         pa_detach_all_error_mq();
     756             : 
     757         416 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     758             : 
     759         416 :         workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
     760         842 :         foreach(lc, workers)
     761             :         {
     762         426 :             LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
     763             : 
     764         426 :             if (isParallelApplyWorker(w))
     765           6 :                 logicalrep_worker_stop_internal(w, SIGTERM);
     766             :         }
     767             : 
     768         416 :         LWLockRelease(LogicalRepWorkerLock);
     769             :     }
     770             : 
     771             :     /* Block concurrent access. */
     772         806 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     773             : 
     774         806 :     logicalrep_worker_cleanup(MyLogicalRepWorker);
     775             : 
     776         806 :     LWLockRelease(LogicalRepWorkerLock);
     777         806 : }
     778             : 
     779             : /*
     780             :  * Clean up worker info.
     781             :  */
     782             : static void
     783         806 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
     784             : {
     785             :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
     786             : 
     787         806 :     worker->type = WORKERTYPE_UNKNOWN;
     788         806 :     worker->in_use = false;
     789         806 :     worker->proc = NULL;
     790         806 :     worker->dbid = InvalidOid;
     791         806 :     worker->userid = InvalidOid;
     792         806 :     worker->subid = InvalidOid;
     793         806 :     worker->relid = InvalidOid;
     794         806 :     worker->leader_pid = InvalidPid;
     795         806 :     worker->parallel_apply = false;
     796         806 : }
     797             : 
     798             : /*
     799             :  * Cleanup function for logical replication launcher.
     800             :  *
     801             :  * Called on logical replication launcher exit.
     802             :  */
     803             : static void
     804         746 : logicalrep_launcher_onexit(int code, Datum arg)
     805             : {
     806         746 :     LogicalRepCtx->launcher_pid = 0;
     807         746 : }
     808             : 
     809             : /*
     810             :  * Cleanup function.
     811             :  *
     812             :  * Called on logical replication worker exit.
     813             :  */
     814             : static void
     815         806 : logicalrep_worker_onexit(int code, Datum arg)
     816             : {
     817             :     /* Disconnect gracefully from the remote side. */
     818         806 :     if (LogRepWorkerWalRcvConn)
     819         726 :         walrcv_disconnect(LogRepWorkerWalRcvConn);
     820             : 
     821         806 :     logicalrep_worker_detach();
     822             : 
     823             :     /* Cleanup fileset used for streaming transactions. */
     824         806 :     if (MyLogicalRepWorker->stream_fileset != NULL)
     825          28 :         FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
     826             : 
     827             :     /*
     828             :      * Session level locks may be acquired outside of a transaction in
     829             :      * parallel apply mode and will not be released when the worker
     830             :      * terminates, so manually release all locks before the worker exits.
     831             :      *
     832             :      * The locks will be acquired once the worker is initialized.
     833             :      */
     834         806 :     if (!InitializingApplyWorker)
     835         798 :         LockReleaseAll(DEFAULT_LOCKMETHOD, true);
     836             : 
     837         806 :     ApplyLauncherWakeup();
     838         806 : }
     839             : 
     840             : /*
     841             :  * Count the number of registered (not necessarily running) sync workers
     842             :  * for a subscription.
     843             :  */
     844             : int
     845        1992 : logicalrep_sync_worker_count(Oid subid)
     846             : {
     847             :     int         i;
     848        1992 :     int         res = 0;
     849             : 
     850             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     851             : 
     852             :     /* Search for attached worker for a given subscription id. */
     853       10296 :     for (i = 0; i < max_logical_replication_workers; i++)
     854             :     {
     855        8304 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     856             : 
     857        8304 :         if (isTablesyncWorker(w) && w->subid == subid)
     858        2172 :             res++;
     859             :     }
     860             : 
     861        1992 :     return res;
     862             : }
     863             : 
     864             : /*
     865             :  * Count the number of registered (but not necessarily running) parallel apply
     866             :  * workers for a subscription.
     867             :  */
     868             : static int
     869         654 : logicalrep_pa_worker_count(Oid subid)
     870             : {
     871             :     int         i;
     872         654 :     int         res = 0;
     873             : 
     874             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     875             : 
     876             :     /*
     877             :      * Scan all attached parallel apply workers, only counting those which
     878             :      * have the given subscription id.
     879             :      */
     880        3458 :     for (i = 0; i < max_logical_replication_workers; i++)
     881             :     {
     882        2804 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     883             : 
     884        2804 :         if (isParallelApplyWorker(w) && w->subid == subid)
     885           4 :             res++;
     886             :     }
     887             : 
     888         654 :     return res;
     889             : }
     890             : 
     891             : /*
     892             :  * ApplyLauncherShmemSize
     893             :  *      Compute space needed for replication launcher shared memory
     894             :  */
     895             : Size
     896        7402 : ApplyLauncherShmemSize(void)
     897             : {
     898             :     Size        size;
     899             : 
     900             :     /*
     901             :      * Need the fixed struct and the array of LogicalRepWorker.
     902             :      */
     903        7402 :     size = sizeof(LogicalRepCtxStruct);
     904        7402 :     size = MAXALIGN(size);
     905        7402 :     size = add_size(size, mul_size(max_logical_replication_workers,
     906             :                                    sizeof(LogicalRepWorker)));
     907        7402 :     return size;
     908             : }
     909             : 
     910             : /*
     911             :  * ApplyLauncherRegister
     912             :  *      Register a background worker running the logical replication launcher.
     913             :  */
     914             : void
     915        1544 : ApplyLauncherRegister(void)
     916             : {
     917             :     BackgroundWorker bgw;
     918             : 
     919             :     /*
     920             :      * The logical replication launcher is disabled during binary upgrades, to
     921             :      * prevent logical replication workers from running on the source cluster.
     922             :      * That could cause replication origins to move forward after having been
     923             :      * copied to the target cluster, potentially creating conflicts with the
     924             :      * copied data files.
     925             :      */
     926        1544 :     if (max_logical_replication_workers == 0 || IsBinaryUpgrade)
     927          54 :         return;
     928             : 
     929        1490 :     memset(&bgw, 0, sizeof(bgw));
     930        1490 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
     931             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     932        1490 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
     933        1490 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
     934        1490 :     snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
     935        1490 :     snprintf(bgw.bgw_name, BGW_MAXLEN,
     936             :              "logical replication launcher");
     937        1490 :     snprintf(bgw.bgw_type, BGW_MAXLEN,
     938             :              "logical replication launcher");
     939        1490 :     bgw.bgw_restart_time = 5;
     940        1490 :     bgw.bgw_notify_pid = 0;
     941        1490 :     bgw.bgw_main_arg = (Datum) 0;
     942             : 
     943        1490 :     RegisterBackgroundWorker(&bgw);
     944             : }
     945             : 
     946             : /*
     947             :  * ApplyLauncherShmemInit
     948             :  *      Allocate and initialize replication launcher shared memory
     949             :  */
     950             : void
     951        1918 : ApplyLauncherShmemInit(void)
     952             : {
     953             :     bool        found;
     954             : 
     955        1918 :     LogicalRepCtx = (LogicalRepCtxStruct *)
     956        1918 :         ShmemInitStruct("Logical Replication Launcher Data",
     957             :                         ApplyLauncherShmemSize(),
     958             :                         &found);
     959             : 
     960        1918 :     if (!found)
     961             :     {
     962             :         int         slot;
     963             : 
     964        1918 :         memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
     965             : 
     966        1918 :         LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
     967        1918 :         LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
     968             : 
     969             :         /* Initialize memory and spin locks for each worker slot. */
     970        9532 :         for (slot = 0; slot < max_logical_replication_workers; slot++)
     971             :         {
     972        7614 :             LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
     973             : 
     974        7614 :             memset(worker, 0, sizeof(LogicalRepWorker));
     975        7614 :             SpinLockInit(&worker->relmutex);
     976             :         }
     977             :     }
     978        1918 : }
     979             : 
     980             : /*
     981             :  * Initialize or attach to the dynamic shared hash table that stores the
     982             :  * last-start times, if not already done.
     983             :  * This must be called before accessing the table.
     984             :  */
     985             : static void
     986         860 : logicalrep_launcher_attach_dshmem(void)
     987             : {
     988             :     MemoryContext oldcontext;
     989             : 
     990             :     /* Quick exit if we already did this. */
     991         860 :     if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
     992         762 :         last_start_times != NULL)
     993         540 :         return;
     994             : 
     995             :     /* Otherwise, use a lock to ensure only one process creates the table. */
     996         320 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     997             : 
     998             :     /* Be sure any local memory allocated by DSA routines is persistent. */
     999         320 :     oldcontext = MemoryContextSwitchTo(TopMemoryContext);
    1000             : 
    1001         320 :     if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
    1002             :     {
    1003             :         /* Initialize dynamic shared hash table for last-start times. */
    1004          98 :         last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
    1005          98 :         dsa_pin(last_start_times_dsa);
    1006          98 :         dsa_pin_mapping(last_start_times_dsa);
    1007          98 :         last_start_times = dshash_create(last_start_times_dsa, &dsh_params, NULL);
    1008             : 
    1009             :         /* Store handles in shared memory for other backends to use. */
    1010          98 :         LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
    1011          98 :         LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
    1012             :     }
    1013         222 :     else if (!last_start_times)
    1014             :     {
    1015             :         /* Attach to existing dynamic shared hash table. */
    1016         222 :         last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
    1017         222 :         dsa_pin_mapping(last_start_times_dsa);
    1018         222 :         last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
    1019         222 :                                          LogicalRepCtx->last_start_dsh, 0);
    1020             :     }
    1021             : 
    1022         320 :     MemoryContextSwitchTo(oldcontext);
    1023         320 :     LWLockRelease(LogicalRepWorkerLock);
    1024             : }
    1025             : 
    1026             : /*
    1027             :  * Set the last-start time for the subscription.
    1028             :  */
    1029             : static void
    1030         268 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
    1031             : {
    1032             :     LauncherLastStartTimesEntry *entry;
    1033             :     bool        found;
    1034             : 
    1035         268 :     logicalrep_launcher_attach_dshmem();
    1036             : 
    1037         268 :     entry = dshash_find_or_insert(last_start_times, &subid, &found);
    1038         268 :     entry->last_start_time = start_time;
    1039         268 :     dshash_release_lock(last_start_times, entry);
    1040         268 : }
    1041             : 
    1042             : /*
    1043             :  * Return the last-start time for the subscription, or 0 if there isn't one.
    1044             :  */
    1045             : static TimestampTz
    1046         310 : ApplyLauncherGetWorkerStartTime(Oid subid)
    1047             : {
    1048             :     LauncherLastStartTimesEntry *entry;
    1049             :     TimestampTz ret;
    1050             : 
    1051         310 :     logicalrep_launcher_attach_dshmem();
    1052             : 
    1053         310 :     entry = dshash_find(last_start_times, &subid, false);
    1054         310 :     if (entry == NULL)
    1055         230 :         return 0;
    1056             : 
    1057          80 :     ret = entry->last_start_time;
    1058          80 :     dshash_release_lock(last_start_times, entry);
    1059             : 
    1060          80 :     return ret;
    1061             : }
    1062             : 
    1063             : /*
    1064             :  * Remove the last-start-time entry for the subscription, if one exists.
    1065             :  *
    1066             :  * This has two use-cases: to remove the entry related to a subscription
    1067             :  * that's been deleted or disabled (just to avoid leaking shared memory),
    1068             :  * and to allow immediate restart of an apply worker that has exited
    1069             :  * due to subscription parameter changes.
    1070             :  */
    1071             : void
    1072         282 : ApplyLauncherForgetWorkerStartTime(Oid subid)
    1073             : {
    1074         282 :     logicalrep_launcher_attach_dshmem();
    1075             : 
    1076         282 :     (void) dshash_delete_key(last_start_times, &subid);
    1077         282 : }
    1078             : 
    1079             : /*
    1080             :  * Wakeup the launcher on commit if requested.
    1081             :  */
    1082             : void
    1083      786030 : AtEOXact_ApplyLauncher(bool isCommit)
    1084             : {
    1085      786030 :     if (isCommit)
    1086             :     {
    1087      738162 :         if (on_commit_launcher_wakeup)
    1088         254 :             ApplyLauncherWakeup();
    1089             :     }
    1090             : 
    1091      786030 :     on_commit_launcher_wakeup = false;
    1092      786030 : }
    1093             : 
    1094             : /*
    1095             :  * Request wakeup of the launcher on commit of the transaction.
    1096             :  *
    1097             :  * This is used to send launcher signal to stop sleeping and process the
    1098             :  * subscriptions when current transaction commits. Should be used when new
    1099             :  * tuple was added to the pg_subscription catalog.
    1100             : */
    1101             : void
    1102         254 : ApplyLauncherWakeupAtCommit(void)
    1103             : {
    1104         254 :     if (!on_commit_launcher_wakeup)
    1105         254 :         on_commit_launcher_wakeup = true;
    1106         254 : }
    1107             : 
    1108             : static void
    1109        1060 : ApplyLauncherWakeup(void)
    1110             : {
    1111        1060 :     if (LogicalRepCtx->launcher_pid != 0)
    1112        1002 :         kill(LogicalRepCtx->launcher_pid, SIGUSR1);
    1113        1060 : }
    1114             : 
    1115             : /*
    1116             :  * Main loop for the apply launcher process.
    1117             :  */
    1118             : void
    1119         746 : ApplyLauncherMain(Datum main_arg)
    1120             : {
    1121         746 :     ereport(DEBUG1,
    1122             :             (errmsg_internal("logical replication launcher started")));
    1123             : 
    1124         746 :     before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
    1125             : 
    1126             :     Assert(LogicalRepCtx->launcher_pid == 0);
    1127         746 :     LogicalRepCtx->launcher_pid = MyProcPid;
    1128             : 
    1129             :     /* Establish signal handlers. */
    1130         746 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
    1131         746 :     pqsignal(SIGTERM, die);
    1132         746 :     BackgroundWorkerUnblockSignals();
    1133             : 
    1134             :     /*
    1135             :      * Establish connection to nailed catalogs (we only ever access
    1136             :      * pg_subscription).
    1137             :      */
    1138         746 :     BackgroundWorkerInitializeConnection(NULL, NULL, 0);
    1139             : 
    1140             :     /* Enter main loop */
    1141             :     for (;;)
    1142        3760 :     {
    1143             :         int         rc;
    1144             :         List       *sublist;
    1145             :         ListCell   *lc;
    1146             :         MemoryContext subctx;
    1147             :         MemoryContext oldctx;
    1148        4506 :         long        wait_time = DEFAULT_NAPTIME_PER_CYCLE;
    1149             : 
    1150        4506 :         CHECK_FOR_INTERRUPTS();
    1151             : 
    1152             :         /* Use temporary context to avoid leaking memory across cycles. */
    1153        4474 :         subctx = AllocSetContextCreate(TopMemoryContext,
    1154             :                                        "Logical Replication Launcher sublist",
    1155             :                                        ALLOCSET_DEFAULT_SIZES);
    1156        4474 :         oldctx = MemoryContextSwitchTo(subctx);
    1157             : 
    1158             :         /* Start any missing workers for enabled subscriptions. */
    1159        4474 :         sublist = get_subscription_list();
    1160        5262 :         foreach(lc, sublist)
    1161             :         {
    1162         790 :             Subscription *sub = (Subscription *) lfirst(lc);
    1163             :             LogicalRepWorker *w;
    1164             :             TimestampTz last_start;
    1165             :             TimestampTz now;
    1166             :             long        elapsed;
    1167             : 
    1168         790 :             if (!sub->enabled)
    1169          68 :                 continue;
    1170             : 
    1171         722 :             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1172         722 :             w = logicalrep_worker_find(sub->oid, InvalidOid, false);
    1173         722 :             LWLockRelease(LogicalRepWorkerLock);
    1174             : 
    1175         722 :             if (w != NULL)
    1176         412 :                 continue;       /* worker is running already */
    1177             : 
    1178             :             /*
    1179             :              * If the worker is eligible to start now, launch it.  Otherwise,
    1180             :              * adjust wait_time so that we'll wake up as soon as it can be
    1181             :              * started.
    1182             :              *
    1183             :              * Each subscription's apply worker can only be restarted once per
    1184             :              * wal_retrieve_retry_interval, so that errors do not cause us to
    1185             :              * repeatedly restart the worker as fast as possible.  In cases
    1186             :              * where a restart is expected (e.g., subscription parameter
    1187             :              * changes), another process should remove the last-start entry
    1188             :              * for the subscription so that the worker can be restarted
    1189             :              * without waiting for wal_retrieve_retry_interval to elapse.
    1190             :              */
    1191         310 :             last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
    1192         310 :             now = GetCurrentTimestamp();
    1193         310 :             if (last_start == 0 ||
    1194          80 :                 (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
    1195             :             {
    1196         268 :                 ApplyLauncherSetWorkerStartTime(sub->oid, now);
    1197         268 :                 logicalrep_worker_launch(WORKERTYPE_APPLY,
    1198         268 :                                          sub->dbid, sub->oid, sub->name,
    1199             :                                          sub->owner, InvalidOid,
    1200             :                                          DSM_HANDLE_INVALID);
    1201             :             }
    1202             :             else
    1203             :             {
    1204          42 :                 wait_time = Min(wait_time,
    1205             :                                 wal_retrieve_retry_interval - elapsed);
    1206             :             }
    1207             :         }
    1208             : 
    1209             :         /* Switch back to original memory context. */
    1210        4472 :         MemoryContextSwitchTo(oldctx);
    1211             :         /* Clean the temporary memory. */
    1212        4472 :         MemoryContextDelete(subctx);
    1213             : 
    1214             :         /* Wait for more work. */
    1215        4472 :         rc = WaitLatch(MyLatch,
    1216             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1217             :                        wait_time,
    1218             :                        WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
    1219             : 
    1220        4466 :         if (rc & WL_LATCH_SET)
    1221             :         {
    1222        4452 :             ResetLatch(MyLatch);
    1223        4452 :             CHECK_FOR_INTERRUPTS();
    1224             :         }
    1225             : 
    1226        3760 :         if (ConfigReloadPending)
    1227             :         {
    1228          58 :             ConfigReloadPending = false;
    1229          58 :             ProcessConfigFile(PGC_SIGHUP);
    1230             :         }
    1231             :     }
    1232             : 
    1233             :     /* Not reachable */
    1234             : }
    1235             : 
    1236             : /*
    1237             :  * Is current process the logical replication launcher?
    1238             :  */
    1239             : bool
    1240         782 : IsLogicalLauncher(void)
    1241             : {
    1242         782 :     return LogicalRepCtx->launcher_pid == MyProcPid;
    1243             : }
    1244             : 
    1245             : /*
    1246             :  * Return the pid of the leader apply worker if the given pid is the pid of a
    1247             :  * parallel apply worker, otherwise, return InvalidPid.
    1248             :  */
    1249             : pid_t
    1250        1520 : GetLeaderApplyWorkerPid(pid_t pid)
    1251             : {
    1252        1520 :     int         leader_pid = InvalidPid;
    1253             :     int         i;
    1254             : 
    1255        1520 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1256             : 
    1257        7600 :     for (i = 0; i < max_logical_replication_workers; i++)
    1258             :     {
    1259        6080 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
    1260             : 
    1261        6080 :         if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
    1262             :         {
    1263           0 :             leader_pid = w->leader_pid;
    1264           0 :             break;
    1265             :         }
    1266             :     }
    1267             : 
    1268        1520 :     LWLockRelease(LogicalRepWorkerLock);
    1269             : 
    1270        1520 :     return leader_pid;
    1271             : }
    1272             : 
    1273             : /*
    1274             :  * Returns state of the subscriptions.
    1275             :  */
    1276             : Datum
    1277           2 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
    1278             : {
    1279             : #define PG_STAT_GET_SUBSCRIPTION_COLS   10
    1280           2 :     Oid         subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
    1281             :     int         i;
    1282           2 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1283             : 
    1284           2 :     InitMaterializedSRF(fcinfo, 0);
    1285             : 
    1286             :     /* Make sure we get consistent view of the workers. */
    1287           2 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1288             : 
    1289          10 :     for (i = 0; i < max_logical_replication_workers; i++)
    1290             :     {
    1291             :         /* for each row */
    1292           8 :         Datum       values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
    1293           8 :         bool        nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
    1294             :         int         worker_pid;
    1295             :         LogicalRepWorker worker;
    1296             : 
    1297           8 :         memcpy(&worker, &LogicalRepCtx->workers[i],
    1298             :                sizeof(LogicalRepWorker));
    1299           8 :         if (!worker.proc || !IsBackendPid(worker.proc->pid))
    1300           4 :             continue;
    1301             : 
    1302           4 :         if (OidIsValid(subid) && worker.subid != subid)
    1303           0 :             continue;
    1304             : 
    1305           4 :         worker_pid = worker.proc->pid;
    1306             : 
    1307           4 :         values[0] = ObjectIdGetDatum(worker.subid);
    1308           4 :         if (isTablesyncWorker(&worker))
    1309           0 :             values[1] = ObjectIdGetDatum(worker.relid);
    1310             :         else
    1311           4 :             nulls[1] = true;
    1312           4 :         values[2] = Int32GetDatum(worker_pid);
    1313             : 
    1314           4 :         if (isParallelApplyWorker(&worker))
    1315           0 :             values[3] = Int32GetDatum(worker.leader_pid);
    1316             :         else
    1317           4 :             nulls[3] = true;
    1318             : 
    1319           4 :         if (XLogRecPtrIsInvalid(worker.last_lsn))
    1320           2 :             nulls[4] = true;
    1321             :         else
    1322           2 :             values[4] = LSNGetDatum(worker.last_lsn);
    1323           4 :         if (worker.last_send_time == 0)
    1324           0 :             nulls[5] = true;
    1325             :         else
    1326           4 :             values[5] = TimestampTzGetDatum(worker.last_send_time);
    1327           4 :         if (worker.last_recv_time == 0)
    1328           0 :             nulls[6] = true;
    1329             :         else
    1330           4 :             values[6] = TimestampTzGetDatum(worker.last_recv_time);
    1331           4 :         if (XLogRecPtrIsInvalid(worker.reply_lsn))
    1332           2 :             nulls[7] = true;
    1333             :         else
    1334           2 :             values[7] = LSNGetDatum(worker.reply_lsn);
    1335           4 :         if (worker.reply_time == 0)
    1336           0 :             nulls[8] = true;
    1337             :         else
    1338           4 :             values[8] = TimestampTzGetDatum(worker.reply_time);
    1339             : 
    1340           4 :         switch (worker.type)
    1341             :         {
    1342           4 :             case WORKERTYPE_APPLY:
    1343           4 :                 values[9] = CStringGetTextDatum("apply");
    1344           4 :                 break;
    1345           0 :             case WORKERTYPE_PARALLEL_APPLY:
    1346           0 :                 values[9] = CStringGetTextDatum("parallel apply");
    1347           0 :                 break;
    1348           0 :             case WORKERTYPE_TABLESYNC:
    1349           0 :                 values[9] = CStringGetTextDatum("table synchronization");
    1350           0 :                 break;
    1351           0 :             case WORKERTYPE_UNKNOWN:
    1352             :                 /* Should never happen. */
    1353           0 :                 elog(ERROR, "unknown worker type");
    1354             :         }
    1355             : 
    1356           4 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
    1357             :                              values, nulls);
    1358             : 
    1359             :         /*
    1360             :          * If only a single subscription was requested, and we found it,
    1361             :          * break.
    1362             :          */
    1363           4 :         if (OidIsValid(subid))
    1364           0 :             break;
    1365             :     }
    1366             : 
    1367           2 :     LWLockRelease(LogicalRepWorkerLock);
    1368             : 
    1369           2 :     return (Datum) 0;
    1370             : }

Generated by: LCOV version 1.14