LCOV - code coverage report
Current view: top level - src/backend/replication/logical - launcher.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 406 461 88.1 %
Date: 2024-04-25 06:13:26 Functions: 31 31 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * launcher.c
       3             :  *     PostgreSQL logical replication worker launcher process
       4             :  *
       5             :  * Copyright (c) 2016-2024, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/launcher.c
       9             :  *
      10             :  * NOTES
      11             :  *    This module contains the logical replication worker launcher which
      12             :  *    uses the background worker infrastructure to start the logical
      13             :  *    replication workers for every enabled subscription.
      14             :  *
      15             :  *-------------------------------------------------------------------------
      16             :  */
      17             : 
      18             : #include "postgres.h"
      19             : 
      20             : #include "access/heapam.h"
      21             : #include "access/htup.h"
      22             : #include "access/htup_details.h"
      23             : #include "access/tableam.h"
      24             : #include "access/xact.h"
      25             : #include "catalog/pg_subscription.h"
      26             : #include "catalog/pg_subscription_rel.h"
      27             : #include "funcapi.h"
      28             : #include "lib/dshash.h"
      29             : #include "miscadmin.h"
      30             : #include "pgstat.h"
      31             : #include "postmaster/bgworker.h"
      32             : #include "postmaster/interrupt.h"
      33             : #include "replication/logicallauncher.h"
      34             : #include "replication/slot.h"
      35             : #include "replication/walreceiver.h"
      36             : #include "replication/worker_internal.h"
      37             : #include "storage/ipc.h"
      38             : #include "storage/proc.h"
      39             : #include "storage/procarray.h"
      40             : #include "tcop/tcopprot.h"
      41             : #include "utils/builtins.h"
      42             : #include "utils/memutils.h"
      43             : #include "utils/pg_lsn.h"
      44             : #include "utils/snapmgr.h"
      45             : 
      46             : /* max sleep time between cycles (3min) */
      47             : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
      48             : 
      49             : /* GUC variables */
      50             : int         max_logical_replication_workers = 4;
      51             : int         max_sync_workers_per_subscription = 2;
      52             : int         max_parallel_apply_workers_per_subscription = 2;
      53             : 
      54             : LogicalRepWorker *MyLogicalRepWorker = NULL;
      55             : 
      56             : typedef struct LogicalRepCtxStruct
      57             : {
      58             :     /* Supervisor process. */
      59             :     pid_t       launcher_pid;
      60             : 
      61             :     /* Hash table holding last start times of subscriptions' apply workers. */
      62             :     dsa_handle  last_start_dsa;
      63             :     dshash_table_handle last_start_dsh;
      64             : 
      65             :     /* Background workers. */
      66             :     LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
      67             : } LogicalRepCtxStruct;
      68             : 
      69             : static LogicalRepCtxStruct *LogicalRepCtx;
      70             : 
      71             : /* an entry in the last-start-times shared hash table */
      72             : typedef struct LauncherLastStartTimesEntry
      73             : {
      74             :     Oid         subid;          /* OID of logrep subscription (hash key) */
      75             :     TimestampTz last_start_time;    /* last time its apply worker was started */
      76             : } LauncherLastStartTimesEntry;
      77             : 
      78             : /* parameters for the last-start-times shared hash table */
      79             : static const dshash_parameters dsh_params = {
      80             :     sizeof(Oid),
      81             :     sizeof(LauncherLastStartTimesEntry),
      82             :     dshash_memcmp,
      83             :     dshash_memhash,
      84             :     dshash_memcpy,
      85             :     LWTRANCHE_LAUNCHER_HASH
      86             : };
      87             : 
      88             : static dsa_area *last_start_times_dsa = NULL;
      89             : static dshash_table *last_start_times = NULL;
      90             : 
      91             : static bool on_commit_launcher_wakeup = false;
      92             : 
      93             : 
      94             : static void ApplyLauncherWakeup(void);
      95             : static void logicalrep_launcher_onexit(int code, Datum arg);
      96             : static void logicalrep_worker_onexit(int code, Datum arg);
      97             : static void logicalrep_worker_detach(void);
      98             : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
      99             : static int  logicalrep_pa_worker_count(Oid subid);
     100             : static void logicalrep_launcher_attach_dshmem(void);
     101             : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
     102             : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
     103             : 
     104             : 
     105             : /*
     106             :  * Load the list of subscriptions.
     107             :  *
     108             :  * Only the fields interesting for worker start/stop functions are filled for
     109             :  * each subscription.
     110             :  */
     111             : static List *
     112        4186 : get_subscription_list(void)
     113             : {
     114        4186 :     List       *res = NIL;
     115             :     Relation    rel;
     116             :     TableScanDesc scan;
     117             :     HeapTuple   tup;
     118             :     MemoryContext resultcxt;
     119             : 
     120             :     /* This is the context that we will allocate our output data in */
     121        4186 :     resultcxt = CurrentMemoryContext;
     122             : 
     123             :     /*
     124             :      * Start a transaction so we can access pg_database, and get a snapshot.
     125             :      * We don't have a use for the snapshot itself, but we're interested in
     126             :      * the secondary effect that it sets RecentGlobalXmin.  (This is critical
     127             :      * for anything that reads heap pages, because HOT may decide to prune
     128             :      * them even if the process doesn't attempt to modify any tuples.)
     129             :      *
     130             :      * FIXME: This comment is inaccurate / the code buggy. A snapshot that is
     131             :      * not pushed/active does not reliably prevent HOT pruning (->xmin could
     132             :      * e.g. be cleared when cache invalidations are processed).
     133             :      */
     134        4186 :     StartTransactionCommand();
     135        4186 :     (void) GetTransactionSnapshot();
     136             : 
     137        4186 :     rel = table_open(SubscriptionRelationId, AccessShareLock);
     138        4186 :     scan = table_beginscan_catalog(rel, 0, NULL);
     139             : 
     140        4932 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     141             :     {
     142         746 :         Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
     143             :         Subscription *sub;
     144             :         MemoryContext oldcxt;
     145             : 
     146             :         /*
     147             :          * Allocate our results in the caller's context, not the
     148             :          * transaction's. We do this inside the loop, and restore the original
     149             :          * context at the end, so that leaky things like heap_getnext() are
     150             :          * not called in a potentially long-lived context.
     151             :          */
     152         746 :         oldcxt = MemoryContextSwitchTo(resultcxt);
     153             : 
     154         746 :         sub = (Subscription *) palloc0(sizeof(Subscription));
     155         746 :         sub->oid = subform->oid;
     156         746 :         sub->dbid = subform->subdbid;
     157         746 :         sub->owner = subform->subowner;
     158         746 :         sub->enabled = subform->subenabled;
     159         746 :         sub->name = pstrdup(NameStr(subform->subname));
     160             :         /* We don't fill fields we are not interested in. */
     161             : 
     162         746 :         res = lappend(res, sub);
     163         746 :         MemoryContextSwitchTo(oldcxt);
     164             :     }
     165             : 
     166        4186 :     table_endscan(scan);
     167        4186 :     table_close(rel, AccessShareLock);
     168             : 
     169        4186 :     CommitTransactionCommand();
     170             : 
     171        4186 :     return res;
     172             : }
     173             : 
     174             : /*
     175             :  * Wait for a background worker to start up and attach to the shmem context.
     176             :  *
     177             :  * This is only needed for cleaning up the shared memory in case the worker
     178             :  * fails to attach.
     179             :  *
     180             :  * Returns whether the attach was successful.
     181             :  */
     182             : static bool
     183        3402 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
     184             :                                uint16 generation,
     185             :                                BackgroundWorkerHandle *handle)
     186             : {
     187             :     BgwHandleStatus status;
     188             :     int         rc;
     189             : 
     190             :     for (;;)
     191        2798 :     {
     192             :         pid_t       pid;
     193             : 
     194        3402 :         CHECK_FOR_INTERRUPTS();
     195             : 
     196        3402 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     197             : 
     198             :         /* Worker either died or has started. Return false if died. */
     199        3402 :         if (!worker->in_use || worker->proc)
     200             :         {
     201         600 :             LWLockRelease(LogicalRepWorkerLock);
     202         600 :             return worker->in_use;
     203             :         }
     204             : 
     205        2802 :         LWLockRelease(LogicalRepWorkerLock);
     206             : 
     207             :         /* Check if worker has died before attaching, and clean up after it. */
     208        2802 :         status = GetBackgroundWorkerPid(handle, &pid);
     209             : 
     210        2802 :         if (status == BGWH_STOPPED)
     211             :         {
     212           0 :             LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     213             :             /* Ensure that this was indeed the worker we waited for. */
     214           0 :             if (generation == worker->generation)
     215           0 :                 logicalrep_worker_cleanup(worker);
     216           0 :             LWLockRelease(LogicalRepWorkerLock);
     217           0 :             return false;
     218             :         }
     219             : 
     220             :         /*
     221             :          * We need timeout because we generally don't get notified via latch
     222             :          * about the worker attach.  But we don't expect to have to wait long.
     223             :          */
     224        2802 :         rc = WaitLatch(MyLatch,
     225             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     226             :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     227             : 
     228        2802 :         if (rc & WL_LATCH_SET)
     229             :         {
     230         846 :             ResetLatch(MyLatch);
     231         846 :             CHECK_FOR_INTERRUPTS();
     232             :         }
     233             :     }
     234             : }
     235             : 
     236             : /*
     237             :  * Walks the workers array and searches for one that matches given
     238             :  * subscription id and relid.
     239             :  *
     240             :  * We are only interested in the leader apply worker or table sync worker.
     241             :  */
     242             : LogicalRepWorker *
     243        3834 : logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
     244             : {
     245             :     int         i;
     246        3834 :     LogicalRepWorker *res = NULL;
     247             : 
     248             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     249             : 
     250             :     /* Search for attached worker for a given subscription id. */
     251       11496 :     for (i = 0; i < max_logical_replication_workers; i++)
     252             :     {
     253       10100 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     254             : 
     255             :         /* Skip parallel apply workers. */
     256       10100 :         if (isParallelApplyWorker(w))
     257           0 :             continue;
     258             : 
     259       10100 :         if (w->in_use && w->subid == subid && w->relid == relid &&
     260        2438 :             (!only_running || w->proc))
     261             :         {
     262        2438 :             res = w;
     263        2438 :             break;
     264             :         }
     265             :     }
     266             : 
     267        3834 :     return res;
     268             : }
     269             : 
     270             : /*
     271             :  * Similar to logicalrep_worker_find(), but returns a list of all workers for
     272             :  * the subscription, instead of just one.
     273             :  */
     274             : List *
     275         900 : logicalrep_workers_find(Oid subid, bool only_running)
     276             : {
     277             :     int         i;
     278         900 :     List       *res = NIL;
     279             : 
     280             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     281             : 
     282             :     /* Search for attached worker for a given subscription id. */
     283        4724 :     for (i = 0; i < max_logical_replication_workers; i++)
     284             :     {
     285        3824 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     286             : 
     287        3824 :         if (w->in_use && w->subid == subid && (!only_running || w->proc))
     288         588 :             res = lappend(res, w);
     289             :     }
     290             : 
     291         900 :     return res;
     292             : }
     293             : 
     294             : /*
     295             :  * Start new logical replication background worker, if possible.
     296             :  *
     297             :  * Returns true on success, false on failure.
     298             :  */
     299             : bool
     300         604 : logicalrep_worker_launch(LogicalRepWorkerType wtype,
     301             :                          Oid dbid, Oid subid, const char *subname, Oid userid,
     302             :                          Oid relid, dsm_handle subworker_dsm)
     303             : {
     304             :     BackgroundWorker bgw;
     305             :     BackgroundWorkerHandle *bgw_handle;
     306             :     uint16      generation;
     307             :     int         i;
     308         604 :     int         slot = 0;
     309         604 :     LogicalRepWorker *worker = NULL;
     310             :     int         nsyncworkers;
     311             :     int         nparallelapplyworkers;
     312             :     TimestampTz now;
     313         604 :     bool        is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
     314         604 :     bool        is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
     315             : 
     316             :     /*----------
     317             :      * Sanity checks:
     318             :      * - must be valid worker type
     319             :      * - tablesync workers are only ones to have relid
     320             :      * - parallel apply worker is the only kind of subworker
     321             :      */
     322             :     Assert(wtype != WORKERTYPE_UNKNOWN);
     323             :     Assert(is_tablesync_worker == OidIsValid(relid));
     324             :     Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
     325             : 
     326         604 :     ereport(DEBUG1,
     327             :             (errmsg_internal("starting logical replication worker for subscription \"%s\"",
     328             :                              subname)));
     329             : 
     330             :     /* Report this after the initial starting message for consistency. */
     331         604 :     if (max_replication_slots == 0)
     332           0 :         ereport(ERROR,
     333             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     334             :                  errmsg("cannot start logical replication workers when max_replication_slots = 0")));
     335             : 
     336             :     /*
     337             :      * We need to do the modification of the shared memory under lock so that
     338             :      * we have consistent view.
     339             :      */
     340         604 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     341             : 
     342         604 : retry:
     343             :     /* Find unused worker slot. */
     344        1096 :     for (i = 0; i < max_logical_replication_workers; i++)
     345             :     {
     346        1096 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     347             : 
     348        1096 :         if (!w->in_use)
     349             :         {
     350         604 :             worker = w;
     351         604 :             slot = i;
     352         604 :             break;
     353             :         }
     354             :     }
     355             : 
     356         604 :     nsyncworkers = logicalrep_sync_worker_count(subid);
     357             : 
     358         604 :     now = GetCurrentTimestamp();
     359             : 
     360             :     /*
     361             :      * If we didn't find a free slot, try to do garbage collection.  The
     362             :      * reason we do this is because if some worker failed to start up and its
     363             :      * parent has crashed while waiting, the in_use state was never cleared.
     364             :      */
     365         604 :     if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
     366             :     {
     367           0 :         bool        did_cleanup = false;
     368             : 
     369           0 :         for (i = 0; i < max_logical_replication_workers; i++)
     370             :         {
     371           0 :             LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     372             : 
     373             :             /*
     374             :              * If the worker was marked in use but didn't manage to attach in
     375             :              * time, clean it up.
     376             :              */
     377           0 :             if (w->in_use && !w->proc &&
     378           0 :                 TimestampDifferenceExceeds(w->launch_time, now,
     379             :                                            wal_receiver_timeout))
     380             :             {
     381           0 :                 elog(WARNING,
     382             :                      "logical replication worker for subscription %u took too long to start; canceled",
     383             :                      w->subid);
     384             : 
     385           0 :                 logicalrep_worker_cleanup(w);
     386           0 :                 did_cleanup = true;
     387             :             }
     388             :         }
     389             : 
     390           0 :         if (did_cleanup)
     391           0 :             goto retry;
     392             :     }
     393             : 
     394             :     /*
     395             :      * We don't allow to invoke more sync workers once we have reached the
     396             :      * sync worker limit per subscription. So, just return silently as we
     397             :      * might get here because of an otherwise harmless race condition.
     398             :      */
     399         604 :     if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
     400             :     {
     401           0 :         LWLockRelease(LogicalRepWorkerLock);
     402           0 :         return false;
     403             :     }
     404             : 
     405         604 :     nparallelapplyworkers = logicalrep_pa_worker_count(subid);
     406             : 
     407             :     /*
     408             :      * Return false if the number of parallel apply workers reached the limit
     409             :      * per subscription.
     410             :      */
     411         604 :     if (is_parallel_apply_worker &&
     412          20 :         nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
     413             :     {
     414           0 :         LWLockRelease(LogicalRepWorkerLock);
     415           0 :         return false;
     416             :     }
     417             : 
     418             :     /*
     419             :      * However if there are no more free worker slots, inform user about it
     420             :      * before exiting.
     421             :      */
     422         604 :     if (worker == NULL)
     423             :     {
     424           0 :         LWLockRelease(LogicalRepWorkerLock);
     425           0 :         ereport(WARNING,
     426             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     427             :                  errmsg("out of logical replication worker slots"),
     428             :                  errhint("You might need to increase %s.", "max_logical_replication_workers")));
     429           0 :         return false;
     430             :     }
     431             : 
     432             :     /* Prepare the worker slot. */
     433         604 :     worker->type = wtype;
     434         604 :     worker->launch_time = now;
     435         604 :     worker->in_use = true;
     436         604 :     worker->generation++;
     437         604 :     worker->proc = NULL;
     438         604 :     worker->dbid = dbid;
     439         604 :     worker->userid = userid;
     440         604 :     worker->subid = subid;
     441         604 :     worker->relid = relid;
     442         604 :     worker->relstate = SUBREL_STATE_UNKNOWN;
     443         604 :     worker->relstate_lsn = InvalidXLogRecPtr;
     444         604 :     worker->stream_fileset = NULL;
     445         604 :     worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
     446         604 :     worker->parallel_apply = is_parallel_apply_worker;
     447         604 :     worker->last_lsn = InvalidXLogRecPtr;
     448         604 :     TIMESTAMP_NOBEGIN(worker->last_send_time);
     449         604 :     TIMESTAMP_NOBEGIN(worker->last_recv_time);
     450         604 :     worker->reply_lsn = InvalidXLogRecPtr;
     451         604 :     TIMESTAMP_NOBEGIN(worker->reply_time);
     452             : 
     453             :     /* Before releasing lock, remember generation for future identification. */
     454         604 :     generation = worker->generation;
     455             : 
     456         604 :     LWLockRelease(LogicalRepWorkerLock);
     457             : 
     458             :     /* Register the new dynamic worker. */
     459         604 :     memset(&bgw, 0, sizeof(bgw));
     460         604 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
     461             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     462         604 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
     463         604 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
     464             : 
     465         604 :     switch (worker->type)
     466             :     {
     467         248 :         case WORKERTYPE_APPLY:
     468         248 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
     469         248 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     470             :                      "logical replication apply worker for subscription %u",
     471             :                      subid);
     472         248 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
     473         248 :             break;
     474             : 
     475          20 :         case WORKERTYPE_PARALLEL_APPLY:
     476          20 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
     477          20 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     478             :                      "logical replication parallel apply worker for subscription %u",
     479             :                      subid);
     480          20 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
     481             : 
     482          20 :             memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
     483          20 :             break;
     484             : 
     485         336 :         case WORKERTYPE_TABLESYNC:
     486         336 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
     487         336 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
     488             :                      "logical replication tablesync worker for subscription %u sync %u",
     489             :                      subid,
     490             :                      relid);
     491         336 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
     492         336 :             break;
     493             : 
     494           0 :         case WORKERTYPE_UNKNOWN:
     495             :             /* Should never happen. */
     496           0 :             elog(ERROR, "unknown worker type");
     497             :     }
     498             : 
     499         604 :     bgw.bgw_restart_time = BGW_NEVER_RESTART;
     500         604 :     bgw.bgw_notify_pid = MyProcPid;
     501         604 :     bgw.bgw_main_arg = Int32GetDatum(slot);
     502             : 
     503         604 :     if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
     504             :     {
     505             :         /* Failed to start worker, so clean up the worker slot. */
     506           0 :         LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     507             :         Assert(generation == worker->generation);
     508           0 :         logicalrep_worker_cleanup(worker);
     509           0 :         LWLockRelease(LogicalRepWorkerLock);
     510             : 
     511           0 :         ereport(WARNING,
     512             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     513             :                  errmsg("out of background worker slots"),
     514             :                  errhint("You might need to increase %s.", "max_worker_processes")));
     515           0 :         return false;
     516             :     }
     517             : 
     518             :     /* Now wait until it attaches. */
     519         604 :     return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
     520             : }
     521             : 
     522             : /*
     523             :  * Internal function to stop the worker and wait until it detaches from the
     524             :  * slot.
     525             :  */
     526             : static void
     527         126 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
     528             : {
     529             :     uint16      generation;
     530             : 
     531             :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
     532             : 
     533             :     /*
     534             :      * Remember which generation was our worker so we can check if what we see
     535             :      * is still the same one.
     536             :      */
     537         126 :     generation = worker->generation;
     538             : 
     539             :     /*
     540             :      * If we found a worker but it does not have proc set then it is still
     541             :      * starting up; wait for it to finish starting and then kill it.
     542             :      */
     543         138 :     while (worker->in_use && !worker->proc)
     544             :     {
     545             :         int         rc;
     546             : 
     547          18 :         LWLockRelease(LogicalRepWorkerLock);
     548             : 
     549             :         /* Wait a bit --- we don't expect to have to wait long. */
     550          18 :         rc = WaitLatch(MyLatch,
     551             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     552             :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     553             : 
     554          18 :         if (rc & WL_LATCH_SET)
     555             :         {
     556           0 :             ResetLatch(MyLatch);
     557           0 :             CHECK_FOR_INTERRUPTS();
     558             :         }
     559             : 
     560             :         /* Recheck worker status. */
     561          18 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     562             : 
     563             :         /*
     564             :          * Check whether the worker slot is no longer used, which would mean
     565             :          * that the worker has exited, or whether the worker generation is
     566             :          * different, meaning that a different worker has taken the slot.
     567             :          */
     568          18 :         if (!worker->in_use || worker->generation != generation)
     569           0 :             return;
     570             : 
     571             :         /* Worker has assigned proc, so it has started. */
     572          18 :         if (worker->proc)
     573           6 :             break;
     574             :     }
     575             : 
     576             :     /* Now terminate the worker ... */
     577         126 :     kill(worker->proc->pid, signo);
     578             : 
     579             :     /* ... and wait for it to die. */
     580             :     for (;;)
     581         156 :     {
     582             :         int         rc;
     583             : 
     584             :         /* is it gone? */
     585         282 :         if (!worker->proc || worker->generation != generation)
     586             :             break;
     587             : 
     588         156 :         LWLockRelease(LogicalRepWorkerLock);
     589             : 
     590             :         /* Wait a bit --- we don't expect to have to wait long. */
     591         156 :         rc = WaitLatch(MyLatch,
     592             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     593             :                        10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
     594             : 
     595         156 :         if (rc & WL_LATCH_SET)
     596             :         {
     597          34 :             ResetLatch(MyLatch);
     598          34 :             CHECK_FOR_INTERRUPTS();
     599             :         }
     600             : 
     601         156 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     602             :     }
     603             : }
     604             : 
     605             : /*
     606             :  * Stop the logical replication worker for subid/relid, if any.
     607             :  */
     608             : void
     609         144 : logicalrep_worker_stop(Oid subid, Oid relid)
     610             : {
     611             :     LogicalRepWorker *worker;
     612             : 
     613         144 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     614             : 
     615         144 :     worker = logicalrep_worker_find(subid, relid, false);
     616             : 
     617         144 :     if (worker)
     618             :     {
     619             :         Assert(!isParallelApplyWorker(worker));
     620         108 :         logicalrep_worker_stop_internal(worker, SIGTERM);
     621             :     }
     622             : 
     623         144 :     LWLockRelease(LogicalRepWorkerLock);
     624         144 : }
     625             : 
     626             : /*
     627             :  * Stop the given logical replication parallel apply worker.
     628             :  *
     629             :  * Node that the function sends SIGINT instead of SIGTERM to the parallel apply
     630             :  * worker so that the worker exits cleanly.
     631             :  */
     632             : void
     633          10 : logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
     634             : {
     635             :     int         slot_no;
     636             :     uint16      generation;
     637             :     LogicalRepWorker *worker;
     638             : 
     639          10 :     SpinLockAcquire(&winfo->shared->mutex);
     640          10 :     generation = winfo->shared->logicalrep_worker_generation;
     641          10 :     slot_no = winfo->shared->logicalrep_worker_slot_no;
     642          10 :     SpinLockRelease(&winfo->shared->mutex);
     643             : 
     644             :     Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
     645             : 
     646             :     /*
     647             :      * Detach from the error_mq_handle for the parallel apply worker before
     648             :      * stopping it. This prevents the leader apply worker from trying to
     649             :      * receive the message from the error queue that might already be detached
     650             :      * by the parallel apply worker.
     651             :      */
     652          10 :     if (winfo->error_mq_handle)
     653             :     {
     654          10 :         shm_mq_detach(winfo->error_mq_handle);
     655          10 :         winfo->error_mq_handle = NULL;
     656             :     }
     657             : 
     658          10 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     659             : 
     660          10 :     worker = &LogicalRepCtx->workers[slot_no];
     661             :     Assert(isParallelApplyWorker(worker));
     662             : 
     663             :     /*
     664             :      * Only stop the worker if the generation matches and the worker is alive.
     665             :      */
     666          10 :     if (worker->generation == generation && worker->proc)
     667          10 :         logicalrep_worker_stop_internal(worker, SIGINT);
     668             : 
     669          10 :     LWLockRelease(LogicalRepWorkerLock);
     670          10 : }
     671             : 
     672             : /*
     673             :  * Wake up (using latch) any logical replication worker for specified sub/rel.
     674             :  */
     675             : void
     676         374 : logicalrep_worker_wakeup(Oid subid, Oid relid)
     677             : {
     678             :     LogicalRepWorker *worker;
     679             : 
     680         374 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     681             : 
     682         374 :     worker = logicalrep_worker_find(subid, relid, true);
     683             : 
     684         374 :     if (worker)
     685         374 :         logicalrep_worker_wakeup_ptr(worker);
     686             : 
     687         374 :     LWLockRelease(LogicalRepWorkerLock);
     688         374 : }
     689             : 
     690             : /*
     691             :  * Wake up (using latch) the specified logical replication worker.
     692             :  *
     693             :  * Caller must hold lock, else worker->proc could change under us.
     694             :  */
     695             : void
     696        1104 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
     697             : {
     698             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     699             : 
     700        1104 :     SetLatch(&worker->proc->procLatch);
     701        1104 : }
     702             : 
     703             : /*
     704             :  * Attach to a slot.
     705             :  */
     706             : void
     707         738 : logicalrep_worker_attach(int slot)
     708             : {
     709             :     /* Block concurrent access. */
     710         738 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     711             : 
     712             :     Assert(slot >= 0 && slot < max_logical_replication_workers);
     713         738 :     MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
     714             : 
     715         738 :     if (!MyLogicalRepWorker->in_use)
     716             :     {
     717           0 :         LWLockRelease(LogicalRepWorkerLock);
     718           0 :         ereport(ERROR,
     719             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     720             :                  errmsg("logical replication worker slot %d is empty, cannot attach",
     721             :                         slot)));
     722             :     }
     723             : 
     724         738 :     if (MyLogicalRepWorker->proc)
     725             :     {
     726           0 :         LWLockRelease(LogicalRepWorkerLock);
     727           0 :         ereport(ERROR,
     728             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     729             :                  errmsg("logical replication worker slot %d is already used by "
     730             :                         "another worker, cannot attach", slot)));
     731             :     }
     732             : 
     733         738 :     MyLogicalRepWorker->proc = MyProc;
     734         738 :     before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
     735             : 
     736         738 :     LWLockRelease(LogicalRepWorkerLock);
     737         738 : }
     738             : 
     739             : /*
     740             :  * Stop the parallel apply workers if any, and detach the leader apply worker
     741             :  * (cleans up the worker info).
     742             :  */
     743             : static void
     744         738 : logicalrep_worker_detach(void)
     745             : {
     746             :     /* Stop the parallel apply workers. */
     747         738 :     if (am_leader_apply_worker())
     748             :     {
     749             :         List       *workers;
     750             :         ListCell   *lc;
     751             : 
     752             :         /*
     753             :          * Detach from the error_mq_handle for all parallel apply workers
     754             :          * before terminating them. This prevents the leader apply worker from
     755             :          * receiving the worker termination message and sending it to logs
     756             :          * when the same is already done by the parallel worker.
     757             :          */
     758         378 :         pa_detach_all_error_mq();
     759             : 
     760         378 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     761             : 
     762         378 :         workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true);
     763         766 :         foreach(lc, workers)
     764             :         {
     765         388 :             LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
     766             : 
     767         388 :             if (isParallelApplyWorker(w))
     768           8 :                 logicalrep_worker_stop_internal(w, SIGTERM);
     769             :         }
     770             : 
     771         378 :         LWLockRelease(LogicalRepWorkerLock);
     772             :     }
     773             : 
     774             :     /* Block concurrent access. */
     775         738 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     776             : 
     777         738 :     logicalrep_worker_cleanup(MyLogicalRepWorker);
     778             : 
     779         738 :     LWLockRelease(LogicalRepWorkerLock);
     780         738 : }
     781             : 
     782             : /*
     783             :  * Clean up worker info.
     784             :  */
     785             : static void
     786         738 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
     787             : {
     788             :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
     789             : 
     790         738 :     worker->type = WORKERTYPE_UNKNOWN;
     791         738 :     worker->in_use = false;
     792         738 :     worker->proc = NULL;
     793         738 :     worker->dbid = InvalidOid;
     794         738 :     worker->userid = InvalidOid;
     795         738 :     worker->subid = InvalidOid;
     796         738 :     worker->relid = InvalidOid;
     797         738 :     worker->leader_pid = InvalidPid;
     798         738 :     worker->parallel_apply = false;
     799         738 : }
     800             : 
     801             : /*
     802             :  * Cleanup function for logical replication launcher.
     803             :  *
     804             :  * Called on logical replication launcher exit.
     805             :  */
     806             : static void
     807         686 : logicalrep_launcher_onexit(int code, Datum arg)
     808             : {
     809         686 :     LogicalRepCtx->launcher_pid = 0;
     810         686 : }
     811             : 
     812             : /*
     813             :  * Cleanup function.
     814             :  *
     815             :  * Called on logical replication worker exit.
     816             :  */
     817             : static void
     818         738 : logicalrep_worker_onexit(int code, Datum arg)
     819             : {
     820             :     /* Disconnect gracefully from the remote side. */
     821         738 :     if (LogRepWorkerWalRcvConn)
     822         660 :         walrcv_disconnect(LogRepWorkerWalRcvConn);
     823             : 
     824         738 :     logicalrep_worker_detach();
     825             : 
     826             :     /* Cleanup fileset used for streaming transactions. */
     827         738 :     if (MyLogicalRepWorker->stream_fileset != NULL)
     828          28 :         FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
     829             : 
     830             :     /*
     831             :      * Session level locks may be acquired outside of a transaction in
     832             :      * parallel apply mode and will not be released when the worker
     833             :      * terminates, so manually release all locks before the worker exits.
     834             :      *
     835             :      * The locks will be acquired once the worker is initialized.
     836             :      */
     837         738 :     if (!InitializingApplyWorker)
     838         728 :         LockReleaseAll(DEFAULT_LOCKMETHOD, true);
     839             : 
     840         738 :     ApplyLauncherWakeup();
     841         738 : }
     842             : 
     843             : /*
     844             :  * Count the number of registered (not necessarily running) sync workers
     845             :  * for a subscription.
     846             :  */
     847             : int
     848        1668 : logicalrep_sync_worker_count(Oid subid)
     849             : {
     850             :     int         i;
     851        1668 :     int         res = 0;
     852             : 
     853             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     854             : 
     855             :     /* Search for attached worker for a given subscription id. */
     856        8668 :     for (i = 0; i < max_logical_replication_workers; i++)
     857             :     {
     858        7000 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     859             : 
     860        7000 :         if (isTablesyncWorker(w) && w->subid == subid)
     861        1664 :             res++;
     862             :     }
     863             : 
     864        1668 :     return res;
     865             : }
     866             : 
     867             : /*
     868             :  * Count the number of registered (but not necessarily running) parallel apply
     869             :  * workers for a subscription.
     870             :  */
     871             : static int
     872         604 : logicalrep_pa_worker_count(Oid subid)
     873             : {
     874             :     int         i;
     875         604 :     int         res = 0;
     876             : 
     877             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     878             : 
     879             :     /*
     880             :      * Scan all attached parallel apply workers, only counting those which
     881             :      * have the given subscription id.
     882             :      */
     883        3208 :     for (i = 0; i < max_logical_replication_workers; i++)
     884             :     {
     885        2604 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     886             : 
     887        2604 :         if (isParallelApplyWorker(w) && w->subid == subid)
     888           4 :             res++;
     889             :     }
     890             : 
     891         604 :     return res;
     892             : }
     893             : 
     894             : /*
     895             :  * ApplyLauncherShmemSize
     896             :  *      Compute space needed for replication launcher shared memory
     897             :  */
     898             : Size
     899        6834 : ApplyLauncherShmemSize(void)
     900             : {
     901             :     Size        size;
     902             : 
     903             :     /*
     904             :      * Need the fixed struct and the array of LogicalRepWorker.
     905             :      */
     906        6834 :     size = sizeof(LogicalRepCtxStruct);
     907        6834 :     size = MAXALIGN(size);
     908        6834 :     size = add_size(size, mul_size(max_logical_replication_workers,
     909             :                                    sizeof(LogicalRepWorker)));
     910        6834 :     return size;
     911             : }
     912             : 
     913             : /*
     914             :  * ApplyLauncherRegister
     915             :  *      Register a background worker running the logical replication launcher.
     916             :  */
     917             : void
     918        1432 : ApplyLauncherRegister(void)
     919             : {
     920             :     BackgroundWorker bgw;
     921             : 
     922             :     /*
     923             :      * The logical replication launcher is disabled during binary upgrades, to
     924             :      * prevent logical replication workers from running on the source cluster.
     925             :      * That could cause replication origins to move forward after having been
     926             :      * copied to the target cluster, potentially creating conflicts with the
     927             :      * copied data files.
     928             :      */
     929        1432 :     if (max_logical_replication_workers == 0 || IsBinaryUpgrade)
     930          48 :         return;
     931             : 
     932        1384 :     memset(&bgw, 0, sizeof(bgw));
     933        1384 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
     934             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     935        1384 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
     936        1384 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
     937        1384 :     snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
     938        1384 :     snprintf(bgw.bgw_name, BGW_MAXLEN,
     939             :              "logical replication launcher");
     940        1384 :     snprintf(bgw.bgw_type, BGW_MAXLEN,
     941             :              "logical replication launcher");
     942        1384 :     bgw.bgw_restart_time = 5;
     943        1384 :     bgw.bgw_notify_pid = 0;
     944        1384 :     bgw.bgw_main_arg = (Datum) 0;
     945             : 
     946        1384 :     RegisterBackgroundWorker(&bgw);
     947             : }
     948             : 
     949             : /*
     950             :  * ApplyLauncherShmemInit
     951             :  *      Allocate and initialize replication launcher shared memory
     952             :  */
     953             : void
     954        1768 : ApplyLauncherShmemInit(void)
     955             : {
     956             :     bool        found;
     957             : 
     958        1768 :     LogicalRepCtx = (LogicalRepCtxStruct *)
     959        1768 :         ShmemInitStruct("Logical Replication Launcher Data",
     960             :                         ApplyLauncherShmemSize(),
     961             :                         &found);
     962             : 
     963        1768 :     if (!found)
     964             :     {
     965             :         int         slot;
     966             : 
     967        1768 :         memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
     968             : 
     969        1768 :         LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
     970        1768 :         LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
     971             : 
     972             :         /* Initialize memory and spin locks for each worker slot. */
     973        8806 :         for (slot = 0; slot < max_logical_replication_workers; slot++)
     974             :         {
     975        7038 :             LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
     976             : 
     977        7038 :             memset(worker, 0, sizeof(LogicalRepWorker));
     978        7038 :             SpinLockInit(&worker->relmutex);
     979             :         }
     980             :     }
     981        1768 : }
     982             : 
     983             : /*
     984             :  * Initialize or attach to the dynamic shared hash table that stores the
     985             :  * last-start times, if not already done.
     986             :  * This must be called before accessing the table.
     987             :  */
     988             : static void
     989         798 : logicalrep_launcher_attach_dshmem(void)
     990             : {
     991             :     MemoryContext oldcontext;
     992             : 
     993             :     /* Quick exit if we already did this. */
     994         798 :     if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
     995         708 :         last_start_times != NULL)
     996         512 :         return;
     997             : 
     998             :     /* Otherwise, use a lock to ensure only one process creates the table. */
     999         286 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
    1000             : 
    1001             :     /* Be sure any local memory allocated by DSA routines is persistent. */
    1002         286 :     oldcontext = MemoryContextSwitchTo(TopMemoryContext);
    1003             : 
    1004         286 :     if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
    1005             :     {
    1006             :         /* Initialize dynamic shared hash table for last-start times. */
    1007          90 :         last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
    1008          90 :         dsa_pin(last_start_times_dsa);
    1009          90 :         dsa_pin_mapping(last_start_times_dsa);
    1010          90 :         last_start_times = dshash_create(last_start_times_dsa, &dsh_params, NULL);
    1011             : 
    1012             :         /* Store handles in shared memory for other backends to use. */
    1013          90 :         LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
    1014          90 :         LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
    1015             :     }
    1016         196 :     else if (!last_start_times)
    1017             :     {
    1018             :         /* Attach to existing dynamic shared hash table. */
    1019         196 :         last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
    1020         196 :         dsa_pin_mapping(last_start_times_dsa);
    1021         196 :         last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
    1022         196 :                                          LogicalRepCtx->last_start_dsh, 0);
    1023             :     }
    1024             : 
    1025         286 :     MemoryContextSwitchTo(oldcontext);
    1026         286 :     LWLockRelease(LogicalRepWorkerLock);
    1027             : }
    1028             : 
    1029             : /*
    1030             :  * Set the last-start time for the subscription.
    1031             :  */
    1032             : static void
    1033         248 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
    1034             : {
    1035             :     LauncherLastStartTimesEntry *entry;
    1036             :     bool        found;
    1037             : 
    1038         248 :     logicalrep_launcher_attach_dshmem();
    1039             : 
    1040         248 :     entry = dshash_find_or_insert(last_start_times, &subid, &found);
    1041         248 :     entry->last_start_time = start_time;
    1042         248 :     dshash_release_lock(last_start_times, entry);
    1043         248 : }
    1044             : 
    1045             : /*
    1046             :  * Return the last-start time for the subscription, or 0 if there isn't one.
    1047             :  */
    1048             : static TimestampTz
    1049         296 : ApplyLauncherGetWorkerStartTime(Oid subid)
    1050             : {
    1051             :     LauncherLastStartTimesEntry *entry;
    1052             :     TimestampTz ret;
    1053             : 
    1054         296 :     logicalrep_launcher_attach_dshmem();
    1055             : 
    1056         296 :     entry = dshash_find(last_start_times, &subid, false);
    1057         296 :     if (entry == NULL)
    1058         216 :         return 0;
    1059             : 
    1060          80 :     ret = entry->last_start_time;
    1061          80 :     dshash_release_lock(last_start_times, entry);
    1062             : 
    1063          80 :     return ret;
    1064             : }
    1065             : 
    1066             : /*
    1067             :  * Remove the last-start-time entry for the subscription, if one exists.
    1068             :  *
    1069             :  * This has two use-cases: to remove the entry related to a subscription
    1070             :  * that's been deleted or disabled (just to avoid leaking shared memory),
    1071             :  * and to allow immediate restart of an apply worker that has exited
    1072             :  * due to subscription parameter changes.
    1073             :  */
    1074             : void
    1075         254 : ApplyLauncherForgetWorkerStartTime(Oid subid)
    1076             : {
    1077         254 :     logicalrep_launcher_attach_dshmem();
    1078             : 
    1079         254 :     (void) dshash_delete_key(last_start_times, &subid);
    1080         254 : }
    1081             : 
    1082             : /*
    1083             :  * Wakeup the launcher on commit if requested.
    1084             :  */
    1085             : void
    1086      570398 : AtEOXact_ApplyLauncher(bool isCommit)
    1087             : {
    1088      570398 :     if (isCommit)
    1089             :     {
    1090      524506 :         if (on_commit_launcher_wakeup)
    1091         232 :             ApplyLauncherWakeup();
    1092             :     }
    1093             : 
    1094      570398 :     on_commit_launcher_wakeup = false;
    1095      570398 : }
    1096             : 
    1097             : /*
    1098             :  * Request wakeup of the launcher on commit of the transaction.
    1099             :  *
    1100             :  * This is used to send launcher signal to stop sleeping and process the
    1101             :  * subscriptions when current transaction commits. Should be used when new
    1102             :  * tuple was added to the pg_subscription catalog.
    1103             : */
    1104             : void
    1105         232 : ApplyLauncherWakeupAtCommit(void)
    1106             : {
    1107         232 :     if (!on_commit_launcher_wakeup)
    1108         232 :         on_commit_launcher_wakeup = true;
    1109         232 : }
    1110             : 
    1111             : static void
    1112         970 : ApplyLauncherWakeup(void)
    1113             : {
    1114         970 :     if (LogicalRepCtx->launcher_pid != 0)
    1115         918 :         kill(LogicalRepCtx->launcher_pid, SIGUSR1);
    1116         970 : }
    1117             : 
    1118             : /*
    1119             :  * Main loop for the apply launcher process.
    1120             :  */
    1121             : void
    1122         686 : ApplyLauncherMain(Datum main_arg)
    1123             : {
    1124         686 :     ereport(DEBUG1,
    1125             :             (errmsg_internal("logical replication launcher started")));
    1126             : 
    1127         686 :     before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
    1128             : 
    1129             :     Assert(LogicalRepCtx->launcher_pid == 0);
    1130         686 :     LogicalRepCtx->launcher_pid = MyProcPid;
    1131             : 
    1132             :     /* Establish signal handlers. */
    1133         686 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
    1134         686 :     pqsignal(SIGTERM, die);
    1135         686 :     BackgroundWorkerUnblockSignals();
    1136             : 
    1137             :     /*
    1138             :      * Establish connection to nailed catalogs (we only ever access
    1139             :      * pg_subscription).
    1140             :      */
    1141         686 :     BackgroundWorkerInitializeConnection(NULL, NULL, 0);
    1142             : 
    1143             :     /* Enter main loop */
    1144             :     for (;;)
    1145        3504 :     {
    1146             :         int         rc;
    1147             :         List       *sublist;
    1148             :         ListCell   *lc;
    1149             :         MemoryContext subctx;
    1150             :         MemoryContext oldctx;
    1151        4190 :         long        wait_time = DEFAULT_NAPTIME_PER_CYCLE;
    1152             : 
    1153        4190 :         CHECK_FOR_INTERRUPTS();
    1154             : 
    1155             :         /* Use temporary context to avoid leaking memory across cycles. */
    1156        4186 :         subctx = AllocSetContextCreate(TopMemoryContext,
    1157             :                                        "Logical Replication Launcher sublist",
    1158             :                                        ALLOCSET_DEFAULT_SIZES);
    1159        4186 :         oldctx = MemoryContextSwitchTo(subctx);
    1160             : 
    1161             :         /* Start any missing workers for enabled subscriptions. */
    1162        4186 :         sublist = get_subscription_list();
    1163        4928 :         foreach(lc, sublist)
    1164             :         {
    1165         746 :             Subscription *sub = (Subscription *) lfirst(lc);
    1166             :             LogicalRepWorker *w;
    1167             :             TimestampTz last_start;
    1168             :             TimestampTz now;
    1169             :             long        elapsed;
    1170             : 
    1171         746 :             if (!sub->enabled)
    1172          58 :                 continue;
    1173             : 
    1174         688 :             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1175         688 :             w = logicalrep_worker_find(sub->oid, InvalidOid, false);
    1176         688 :             LWLockRelease(LogicalRepWorkerLock);
    1177             : 
    1178         688 :             if (w != NULL)
    1179         392 :                 continue;       /* worker is running already */
    1180             : 
    1181             :             /*
    1182             :              * If the worker is eligible to start now, launch it.  Otherwise,
    1183             :              * adjust wait_time so that we'll wake up as soon as it can be
    1184             :              * started.
    1185             :              *
    1186             :              * Each subscription's apply worker can only be restarted once per
    1187             :              * wal_retrieve_retry_interval, so that errors do not cause us to
    1188             :              * repeatedly restart the worker as fast as possible.  In cases
    1189             :              * where a restart is expected (e.g., subscription parameter
    1190             :              * changes), another process should remove the last-start entry
    1191             :              * for the subscription so that the worker can be restarted
    1192             :              * without waiting for wal_retrieve_retry_interval to elapse.
    1193             :              */
    1194         296 :             last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
    1195         296 :             now = GetCurrentTimestamp();
    1196         296 :             if (last_start == 0 ||
    1197          80 :                 (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
    1198             :             {
    1199         248 :                 ApplyLauncherSetWorkerStartTime(sub->oid, now);
    1200         248 :                 logicalrep_worker_launch(WORKERTYPE_APPLY,
    1201         248 :                                          sub->dbid, sub->oid, sub->name,
    1202             :                                          sub->owner, InvalidOid,
    1203             :                                          DSM_HANDLE_INVALID);
    1204             :             }
    1205             :             else
    1206             :             {
    1207          48 :                 wait_time = Min(wait_time,
    1208             :                                 wal_retrieve_retry_interval - elapsed);
    1209             :             }
    1210             :         }
    1211             : 
    1212             :         /* Switch back to original memory context. */
    1213        4182 :         MemoryContextSwitchTo(oldctx);
    1214             :         /* Clean the temporary memory. */
    1215        4182 :         MemoryContextDelete(subctx);
    1216             : 
    1217             :         /* Wait for more work. */
    1218        4182 :         rc = WaitLatch(MyLatch,
    1219             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1220             :                        wait_time,
    1221             :                        WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
    1222             : 
    1223        4176 :         if (rc & WL_LATCH_SET)
    1224             :         {
    1225        4156 :             ResetLatch(MyLatch);
    1226        4156 :             CHECK_FOR_INTERRUPTS();
    1227             :         }
    1228             : 
    1229        3504 :         if (ConfigReloadPending)
    1230             :         {
    1231          56 :             ConfigReloadPending = false;
    1232          56 :             ProcessConfigFile(PGC_SIGHUP);
    1233             :         }
    1234             :     }
    1235             : 
    1236             :     /* Not reachable */
    1237             : }
    1238             : 
    1239             : /*
    1240             :  * Is current process the logical replication launcher?
    1241             :  */
    1242             : bool
    1243         722 : IsLogicalLauncher(void)
    1244             : {
    1245         722 :     return LogicalRepCtx->launcher_pid == MyProcPid;
    1246             : }
    1247             : 
    1248             : /*
    1249             :  * Return the pid of the leader apply worker if the given pid is the pid of a
    1250             :  * parallel apply worker, otherwise, return InvalidPid.
    1251             :  */
    1252             : pid_t
    1253        3978 : GetLeaderApplyWorkerPid(pid_t pid)
    1254             : {
    1255        3978 :     int         leader_pid = InvalidPid;
    1256             :     int         i;
    1257             : 
    1258        3978 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1259             : 
    1260       19890 :     for (i = 0; i < max_logical_replication_workers; i++)
    1261             :     {
    1262       15912 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
    1263             : 
    1264       15912 :         if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
    1265             :         {
    1266           0 :             leader_pid = w->leader_pid;
    1267           0 :             break;
    1268             :         }
    1269             :     }
    1270             : 
    1271        3978 :     LWLockRelease(LogicalRepWorkerLock);
    1272             : 
    1273        3978 :     return leader_pid;
    1274             : }
    1275             : 
    1276             : /*
    1277             :  * Returns state of the subscriptions.
    1278             :  */
    1279             : Datum
    1280           2 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
    1281             : {
    1282             : #define PG_STAT_GET_SUBSCRIPTION_COLS   10
    1283           2 :     Oid         subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
    1284             :     int         i;
    1285           2 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1286             : 
    1287           2 :     InitMaterializedSRF(fcinfo, 0);
    1288             : 
    1289             :     /* Make sure we get consistent view of the workers. */
    1290           2 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1291             : 
    1292          10 :     for (i = 0; i < max_logical_replication_workers; i++)
    1293             :     {
    1294             :         /* for each row */
    1295           8 :         Datum       values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
    1296           8 :         bool        nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
    1297             :         int         worker_pid;
    1298             :         LogicalRepWorker worker;
    1299             : 
    1300           8 :         memcpy(&worker, &LogicalRepCtx->workers[i],
    1301             :                sizeof(LogicalRepWorker));
    1302           8 :         if (!worker.proc || !IsBackendPid(worker.proc->pid))
    1303           4 :             continue;
    1304             : 
    1305           4 :         if (OidIsValid(subid) && worker.subid != subid)
    1306           0 :             continue;
    1307             : 
    1308           4 :         worker_pid = worker.proc->pid;
    1309             : 
    1310           4 :         values[0] = ObjectIdGetDatum(worker.subid);
    1311           4 :         if (isTablesyncWorker(&worker))
    1312           0 :             values[1] = ObjectIdGetDatum(worker.relid);
    1313             :         else
    1314           4 :             nulls[1] = true;
    1315           4 :         values[2] = Int32GetDatum(worker_pid);
    1316             : 
    1317           4 :         if (isParallelApplyWorker(&worker))
    1318           0 :             values[3] = Int32GetDatum(worker.leader_pid);
    1319             :         else
    1320           4 :             nulls[3] = true;
    1321             : 
    1322           4 :         if (XLogRecPtrIsInvalid(worker.last_lsn))
    1323           2 :             nulls[4] = true;
    1324             :         else
    1325           2 :             values[4] = LSNGetDatum(worker.last_lsn);
    1326           4 :         if (worker.last_send_time == 0)
    1327           0 :             nulls[5] = true;
    1328             :         else
    1329           4 :             values[5] = TimestampTzGetDatum(worker.last_send_time);
    1330           4 :         if (worker.last_recv_time == 0)
    1331           0 :             nulls[6] = true;
    1332             :         else
    1333           4 :             values[6] = TimestampTzGetDatum(worker.last_recv_time);
    1334           4 :         if (XLogRecPtrIsInvalid(worker.reply_lsn))
    1335           2 :             nulls[7] = true;
    1336             :         else
    1337           2 :             values[7] = LSNGetDatum(worker.reply_lsn);
    1338           4 :         if (worker.reply_time == 0)
    1339           0 :             nulls[8] = true;
    1340             :         else
    1341           4 :             values[8] = TimestampTzGetDatum(worker.reply_time);
    1342             : 
    1343           4 :         switch (worker.type)
    1344             :         {
    1345           4 :             case WORKERTYPE_APPLY:
    1346           4 :                 values[9] = CStringGetTextDatum("apply");
    1347           4 :                 break;
    1348           0 :             case WORKERTYPE_PARALLEL_APPLY:
    1349           0 :                 values[9] = CStringGetTextDatum("parallel apply");
    1350           0 :                 break;
    1351           0 :             case WORKERTYPE_TABLESYNC:
    1352           0 :                 values[9] = CStringGetTextDatum("table synchronization");
    1353           0 :                 break;
    1354           0 :             case WORKERTYPE_UNKNOWN:
    1355             :                 /* Should never happen. */
    1356           0 :                 elog(ERROR, "unknown worker type");
    1357             :         }
    1358             : 
    1359           4 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
    1360             :                              values, nulls);
    1361             : 
    1362             :         /*
    1363             :          * If only a single subscription was requested, and we found it,
    1364             :          * break.
    1365             :          */
    1366           4 :         if (OidIsValid(subid))
    1367           0 :             break;
    1368             :     }
    1369             : 
    1370           2 :     LWLockRelease(LogicalRepWorkerLock);
    1371             : 
    1372           2 :     return (Datum) 0;
    1373             : }

Generated by: LCOV version 1.14