LCOV - code coverage report
Current view: top level - src/backend/replication/logical - launcher.c (source / functions) Hit Total Coverage
Test: PostgreSQL 15devel Lines: 303 345 87.8 %
Date: 2021-12-09 03:08:47 Functions: 23 23 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * launcher.c
       3             :  *     PostgreSQL logical replication worker launcher process
       4             :  *
       5             :  * Copyright (c) 2016-2021, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/launcher.c
       9             :  *
      10             :  * NOTES
      11             :  *    This module contains the logical replication worker launcher which
      12             :  *    uses the background worker infrastructure to start the logical
      13             :  *    replication workers for every enabled subscription.
      14             :  *
      15             :  *-------------------------------------------------------------------------
      16             :  */
      17             : 
      18             : #include "postgres.h"
      19             : 
      20             : #include "access/heapam.h"
      21             : #include "access/htup.h"
      22             : #include "access/htup_details.h"
      23             : #include "access/tableam.h"
      24             : #include "access/xact.h"
      25             : #include "catalog/pg_subscription.h"
      26             : #include "catalog/pg_subscription_rel.h"
      27             : #include "funcapi.h"
      28             : #include "libpq/pqsignal.h"
      29             : #include "miscadmin.h"
      30             : #include "pgstat.h"
      31             : #include "postmaster/bgworker.h"
      32             : #include "postmaster/fork_process.h"
      33             : #include "postmaster/interrupt.h"
      34             : #include "postmaster/postmaster.h"
      35             : #include "replication/logicallauncher.h"
      36             : #include "replication/logicalworker.h"
      37             : #include "replication/slot.h"
      38             : #include "replication/walreceiver.h"
      39             : #include "replication/worker_internal.h"
      40             : #include "storage/ipc.h"
      41             : #include "storage/proc.h"
      42             : #include "storage/procarray.h"
      43             : #include "storage/procsignal.h"
      44             : #include "tcop/tcopprot.h"
      45             : #include "utils/memutils.h"
      46             : #include "utils/pg_lsn.h"
      47             : #include "utils/ps_status.h"
      48             : #include "utils/snapmgr.h"
      49             : #include "utils/timeout.h"
      50             : 
      51             : /* max sleep time between cycles (3min) */
      52             : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
      53             : 
      54             : int         max_logical_replication_workers = 4;
      55             : int         max_sync_workers_per_subscription = 2;
      56             : 
      57             : LogicalRepWorker *MyLogicalRepWorker = NULL;
      58             : 
      59             : typedef struct LogicalRepCtxStruct
      60             : {
      61             :     /* Supervisor process. */
      62             :     pid_t       launcher_pid;
      63             : 
      64             :     /* Background workers. */
      65             :     LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
      66             : } LogicalRepCtxStruct;
      67             : 
      68             : LogicalRepCtxStruct *LogicalRepCtx;
      69             : 
      70             : static void ApplyLauncherWakeup(void);
      71             : static void logicalrep_launcher_onexit(int code, Datum arg);
      72             : static void logicalrep_worker_onexit(int code, Datum arg);
      73             : static void logicalrep_worker_detach(void);
      74             : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
      75             : 
      76             : static bool on_commit_launcher_wakeup = false;
      77             : 
      78             : Datum       pg_stat_get_subscription(PG_FUNCTION_ARGS);
      79             : 
      80             : 
      81             : /*
      82             :  * Load the list of subscriptions.
      83             :  *
      84             :  * Only the fields interesting for worker start/stop functions are filled for
      85             :  * each subscription.
      86             :  */
      87             : static List *
      88        2506 : get_subscription_list(void)
      89             : {
      90        2506 :     List       *res = NIL;
      91             :     Relation    rel;
      92             :     TableScanDesc scan;
      93             :     HeapTuple   tup;
      94             :     MemoryContext resultcxt;
      95             : 
      96             :     /* This is the context that we will allocate our output data in */
      97        2506 :     resultcxt = CurrentMemoryContext;
      98             : 
      99             :     /*
     100             :      * Start a transaction so we can access pg_database, and get a snapshot.
     101             :      * We don't have a use for the snapshot itself, but we're interested in
     102             :      * the secondary effect that it sets RecentGlobalXmin.  (This is critical
     103             :      * for anything that reads heap pages, because HOT may decide to prune
     104             :      * them even if the process doesn't attempt to modify any tuples.)
     105             :      *
     106             :      * FIXME: This comment is inaccurate / the code buggy. A snapshot that is
     107             :      * not pushed/active does not reliably prevent HOT pruning (->xmin could
     108             :      * e.g. be cleared when cache invalidations are processed).
     109             :      */
     110        2506 :     StartTransactionCommand();
     111        2506 :     (void) GetTransactionSnapshot();
     112             : 
     113        2506 :     rel = table_open(SubscriptionRelationId, AccessShareLock);
     114        2506 :     scan = table_beginscan_catalog(rel, 0, NULL);
     115             : 
     116        2750 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     117             :     {
     118         244 :         Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
     119             :         Subscription *sub;
     120             :         MemoryContext oldcxt;
     121             : 
     122             :         /*
     123             :          * Allocate our results in the caller's context, not the
     124             :          * transaction's. We do this inside the loop, and restore the original
     125             :          * context at the end, so that leaky things like heap_getnext() are
     126             :          * not called in a potentially long-lived context.
     127             :          */
     128         244 :         oldcxt = MemoryContextSwitchTo(resultcxt);
     129             : 
     130         244 :         sub = (Subscription *) palloc0(sizeof(Subscription));
     131         244 :         sub->oid = subform->oid;
     132         244 :         sub->dbid = subform->subdbid;
     133         244 :         sub->owner = subform->subowner;
     134         244 :         sub->enabled = subform->subenabled;
     135         244 :         sub->name = pstrdup(NameStr(subform->subname));
     136             :         /* We don't fill fields we are not interested in. */
     137             : 
     138         244 :         res = lappend(res, sub);
     139         244 :         MemoryContextSwitchTo(oldcxt);
     140             :     }
     141             : 
     142        2506 :     table_endscan(scan);
     143        2506 :     table_close(rel, AccessShareLock);
     144             : 
     145        2506 :     CommitTransactionCommand();
     146             : 
     147        2506 :     return res;
     148             : }
     149             : 
     150             : /*
     151             :  * Wait for a background worker to start up and attach to the shmem context.
     152             :  *
     153             :  * This is only needed for cleaning up the shared memory in case the worker
     154             :  * fails to attach.
     155             :  */
     156             : static void
     157        1494 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
     158             :                                uint16 generation,
     159             :                                BackgroundWorkerHandle *handle)
     160             : {
     161             :     BgwHandleStatus status;
     162             :     int         rc;
     163             : 
     164             :     for (;;)
     165        1218 :     {
     166             :         pid_t       pid;
     167             : 
     168        1494 :         CHECK_FOR_INTERRUPTS();
     169             : 
     170        1494 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     171             : 
     172             :         /* Worker either died or has started; no need to do anything. */
     173        1494 :         if (!worker->in_use || worker->proc)
     174             :         {
     175         276 :             LWLockRelease(LogicalRepWorkerLock);
     176         276 :             return;
     177             :         }
     178             : 
     179        1218 :         LWLockRelease(LogicalRepWorkerLock);
     180             : 
     181             :         /* Check if worker has died before attaching, and clean up after it. */
     182        1218 :         status = GetBackgroundWorkerPid(handle, &pid);
     183             : 
     184        1218 :         if (status == BGWH_STOPPED)
     185             :         {
     186           0 :             LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     187             :             /* Ensure that this was indeed the worker we waited for. */
     188           0 :             if (generation == worker->generation)
     189           0 :                 logicalrep_worker_cleanup(worker);
     190           0 :             LWLockRelease(LogicalRepWorkerLock);
     191           0 :             return;
     192             :         }
     193             : 
     194             :         /*
     195             :          * We need timeout because we generally don't get notified via latch
     196             :          * about the worker attach.  But we don't expect to have to wait long.
     197             :          */
     198        1218 :         rc = WaitLatch(MyLatch,
     199             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     200             :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     201             : 
     202        1218 :         if (rc & WL_LATCH_SET)
     203             :         {
     204         384 :             ResetLatch(MyLatch);
     205         384 :             CHECK_FOR_INTERRUPTS();
     206             :         }
     207             :     }
     208             : }
     209             : 
     210             : /*
     211             :  * Walks the workers array and searches for one that matches given
     212             :  * subscription id and relid.
     213             :  */
     214             : LogicalRepWorker *
     215        1964 : logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
     216             : {
     217             :     int         i;
     218        1964 :     LogicalRepWorker *res = NULL;
     219             : 
     220             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     221             : 
     222             :     /* Search for attached worker for a given subscription id. */
     223        6312 :     for (i = 0; i < max_logical_replication_workers; i++)
     224             :     {
     225        5480 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     226             : 
     227        5480 :         if (w->in_use && w->subid == subid && w->relid == relid &&
     228        1132 :             (!only_running || w->proc))
     229             :         {
     230        1132 :             res = w;
     231        1132 :             break;
     232             :         }
     233             :     }
     234             : 
     235        1964 :     return res;
     236             : }
     237             : 
     238             : /*
     239             :  * Similar to logicalrep_worker_find(), but returns list of all workers for
     240             :  * the subscription, instead just one.
     241             :  */
     242             : List *
     243          70 : logicalrep_workers_find(Oid subid, bool only_running)
     244             : {
     245             :     int         i;
     246          70 :     List       *res = NIL;
     247             : 
     248             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     249             : 
     250             :     /* Search for attached worker for a given subscription id. */
     251         354 :     for (i = 0; i < max_logical_replication_workers; i++)
     252             :     {
     253         284 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     254             : 
     255         284 :         if (w->in_use && w->subid == subid && (!only_running || w->proc))
     256          42 :             res = lappend(res, w);
     257             :     }
     258             : 
     259          70 :     return res;
     260             : }
     261             : 
     262             : /*
     263             :  * Start new apply background worker, if possible.
     264             :  */
     265             : void
     266         276 : logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
     267             :                          Oid relid)
     268             : {
     269             :     BackgroundWorker bgw;
     270             :     BackgroundWorkerHandle *bgw_handle;
     271             :     uint16      generation;
     272             :     int         i;
     273         276 :     int         slot = 0;
     274         276 :     LogicalRepWorker *worker = NULL;
     275             :     int         nsyncworkers;
     276             :     TimestampTz now;
     277             : 
     278         276 :     ereport(DEBUG1,
     279             :             (errmsg_internal("starting logical replication worker for subscription \"%s\"",
     280             :                              subname)));
     281             : 
     282             :     /* Report this after the initial starting message for consistency. */
     283         276 :     if (max_replication_slots == 0)
     284           0 :         ereport(ERROR,
     285             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     286             :                  errmsg("cannot start logical replication workers when max_replication_slots = 0")));
     287             : 
     288             :     /*
     289             :      * We need to do the modification of the shared memory under lock so that
     290             :      * we have consistent view.
     291             :      */
     292         276 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     293             : 
     294         276 : retry:
     295             :     /* Find unused worker slot. */
     296         524 :     for (i = 0; i < max_logical_replication_workers; i++)
     297             :     {
     298         524 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     299             : 
     300         524 :         if (!w->in_use)
     301             :         {
     302         276 :             worker = w;
     303         276 :             slot = i;
     304         276 :             break;
     305             :         }
     306             :     }
     307             : 
     308         276 :     nsyncworkers = logicalrep_sync_worker_count(subid);
     309             : 
     310         276 :     now = GetCurrentTimestamp();
     311             : 
     312             :     /*
     313             :      * If we didn't find a free slot, try to do garbage collection.  The
     314             :      * reason we do this is because if some worker failed to start up and its
     315             :      * parent has crashed while waiting, the in_use state was never cleared.
     316             :      */
     317         276 :     if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
     318             :     {
     319           0 :         bool        did_cleanup = false;
     320             : 
     321           0 :         for (i = 0; i < max_logical_replication_workers; i++)
     322             :         {
     323           0 :             LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     324             : 
     325             :             /*
     326             :              * If the worker was marked in use but didn't manage to attach in
     327             :              * time, clean it up.
     328             :              */
     329           0 :             if (w->in_use && !w->proc &&
     330           0 :                 TimestampDifferenceExceeds(w->launch_time, now,
     331             :                                            wal_receiver_timeout))
     332             :             {
     333           0 :                 elog(WARNING,
     334             :                      "logical replication worker for subscription %u took too long to start; canceled",
     335             :                      w->subid);
     336             : 
     337           0 :                 logicalrep_worker_cleanup(w);
     338           0 :                 did_cleanup = true;
     339             :             }
     340             :         }
     341             : 
     342           0 :         if (did_cleanup)
     343           0 :             goto retry;
     344             :     }
     345             : 
     346             :     /*
     347             :      * If we reached the sync worker limit per subscription, just exit
     348             :      * silently as we might get here because of an otherwise harmless race
     349             :      * condition.
     350             :      */
     351         276 :     if (nsyncworkers >= max_sync_workers_per_subscription)
     352             :     {
     353           0 :         LWLockRelease(LogicalRepWorkerLock);
     354           0 :         return;
     355             :     }
     356             : 
     357             :     /*
     358             :      * However if there are no more free worker slots, inform user about it
     359             :      * before exiting.
     360             :      */
     361         276 :     if (worker == NULL)
     362             :     {
     363           0 :         LWLockRelease(LogicalRepWorkerLock);
     364           0 :         ereport(WARNING,
     365             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     366             :                  errmsg("out of logical replication worker slots"),
     367             :                  errhint("You might need to increase max_logical_replication_workers.")));
     368           0 :         return;
     369             :     }
     370             : 
     371             :     /* Prepare the worker slot. */
     372         276 :     worker->launch_time = now;
     373         276 :     worker->in_use = true;
     374         276 :     worker->generation++;
     375         276 :     worker->proc = NULL;
     376         276 :     worker->dbid = dbid;
     377         276 :     worker->userid = userid;
     378         276 :     worker->subid = subid;
     379         276 :     worker->relid = relid;
     380         276 :     worker->relstate = SUBREL_STATE_UNKNOWN;
     381         276 :     worker->relstate_lsn = InvalidXLogRecPtr;
     382         276 :     worker->stream_fileset = NULL;
     383         276 :     worker->last_lsn = InvalidXLogRecPtr;
     384         276 :     TIMESTAMP_NOBEGIN(worker->last_send_time);
     385         276 :     TIMESTAMP_NOBEGIN(worker->last_recv_time);
     386         276 :     worker->reply_lsn = InvalidXLogRecPtr;
     387         276 :     TIMESTAMP_NOBEGIN(worker->reply_time);
     388             : 
     389             :     /* Before releasing lock, remember generation for future identification. */
     390         276 :     generation = worker->generation;
     391             : 
     392         276 :     LWLockRelease(LogicalRepWorkerLock);
     393             : 
     394             :     /* Register the new dynamic worker. */
     395         276 :     memset(&bgw, 0, sizeof(bgw));
     396         276 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
     397             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     398         276 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
     399         276 :     snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
     400         276 :     snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
     401         276 :     if (OidIsValid(relid))
     402         170 :         snprintf(bgw.bgw_name, BGW_MAXLEN,
     403             :                  "logical replication worker for subscription %u sync %u", subid, relid);
     404             :     else
     405         106 :         snprintf(bgw.bgw_name, BGW_MAXLEN,
     406             :                  "logical replication worker for subscription %u", subid);
     407         276 :     snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker");
     408             : 
     409         276 :     bgw.bgw_restart_time = BGW_NEVER_RESTART;
     410         276 :     bgw.bgw_notify_pid = MyProcPid;
     411         276 :     bgw.bgw_main_arg = Int32GetDatum(slot);
     412             : 
     413         276 :     if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
     414             :     {
     415             :         /* Failed to start worker, so clean up the worker slot. */
     416           0 :         LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     417             :         Assert(generation == worker->generation);
     418           0 :         logicalrep_worker_cleanup(worker);
     419           0 :         LWLockRelease(LogicalRepWorkerLock);
     420             : 
     421           0 :         ereport(WARNING,
     422             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     423             :                  errmsg("out of background worker slots"),
     424             :                  errhint("You might need to increase max_worker_processes.")));
     425           0 :         return;
     426             :     }
     427             : 
     428             :     /* Now wait until it attaches. */
     429         276 :     WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
     430             : }
     431             : 
     432             : /*
     433             :  * Stop the logical replication worker for subid/relid, if any, and wait until
     434             :  * it detaches from the slot.
     435             :  */
     436             : void
     437          70 : logicalrep_worker_stop(Oid subid, Oid relid)
     438             : {
     439             :     LogicalRepWorker *worker;
     440             :     uint16      generation;
     441             : 
     442          70 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     443             : 
     444          70 :     worker = logicalrep_worker_find(subid, relid, false);
     445             : 
     446             :     /* No worker, nothing to do. */
     447          70 :     if (!worker)
     448             :     {
     449          28 :         LWLockRelease(LogicalRepWorkerLock);
     450          28 :         return;
     451             :     }
     452             : 
     453             :     /*
     454             :      * Remember which generation was our worker so we can check if what we see
     455             :      * is still the same one.
     456             :      */
     457          42 :     generation = worker->generation;
     458             : 
     459             :     /*
     460             :      * If we found a worker but it does not have proc set then it is still
     461             :      * starting up; wait for it to finish starting and then kill it.
     462             :      */
     463          48 :     while (worker->in_use && !worker->proc)
     464             :     {
     465             :         int         rc;
     466             : 
     467          10 :         LWLockRelease(LogicalRepWorkerLock);
     468             : 
     469             :         /* Wait a bit --- we don't expect to have to wait long. */
     470          10 :         rc = WaitLatch(MyLatch,
     471             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     472             :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     473             : 
     474          10 :         if (rc & WL_LATCH_SET)
     475             :         {
     476           2 :             ResetLatch(MyLatch);
     477           2 :             CHECK_FOR_INTERRUPTS();
     478             :         }
     479             : 
     480             :         /* Recheck worker status. */
     481          10 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     482             : 
     483             :         /*
     484             :          * Check whether the worker slot is no longer used, which would mean
     485             :          * that the worker has exited, or whether the worker generation is
     486             :          * different, meaning that a different worker has taken the slot.
     487             :          */
     488          10 :         if (!worker->in_use || worker->generation != generation)
     489             :         {
     490           0 :             LWLockRelease(LogicalRepWorkerLock);
     491           0 :             return;
     492             :         }
     493             : 
     494             :         /* Worker has assigned proc, so it has started. */
     495          10 :         if (worker->proc)
     496           4 :             break;
     497             :     }
     498             : 
     499             :     /* Now terminate the worker ... */
     500          42 :     kill(worker->proc->pid, SIGTERM);
     501             : 
     502             :     /* ... and wait for it to die. */
     503             :     for (;;)
     504          44 :     {
     505             :         int         rc;
     506             : 
     507             :         /* is it gone? */
     508          86 :         if (!worker->proc || worker->generation != generation)
     509             :             break;
     510             : 
     511          44 :         LWLockRelease(LogicalRepWorkerLock);
     512             : 
     513             :         /* Wait a bit --- we don't expect to have to wait long. */
     514          44 :         rc = WaitLatch(MyLatch,
     515             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     516             :                        10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
     517             : 
     518          44 :         if (rc & WL_LATCH_SET)
     519             :         {
     520           4 :             ResetLatch(MyLatch);
     521           4 :             CHECK_FOR_INTERRUPTS();
     522             :         }
     523             : 
     524          44 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     525             :     }
     526             : 
     527          42 :     LWLockRelease(LogicalRepWorkerLock);
     528             : }
     529             : 
     530             : /*
     531             :  * Wake up (using latch) any logical replication worker for specified sub/rel.
     532             :  */
     533             : void
     534         166 : logicalrep_worker_wakeup(Oid subid, Oid relid)
     535             : {
     536             :     LogicalRepWorker *worker;
     537             : 
     538         166 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     539             : 
     540         166 :     worker = logicalrep_worker_find(subid, relid, true);
     541             : 
     542         166 :     if (worker)
     543         166 :         logicalrep_worker_wakeup_ptr(worker);
     544             : 
     545         166 :     LWLockRelease(LogicalRepWorkerLock);
     546         166 : }
     547             : 
     548             : /*
     549             :  * Wake up (using latch) the specified logical replication worker.
     550             :  *
     551             :  * Caller must hold lock, else worker->proc could change under us.
     552             :  */
     553             : void
     554         494 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
     555             : {
     556             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     557             : 
     558         494 :     SetLatch(&worker->proc->procLatch);
     559         494 : }
     560             : 
     561             : /*
     562             :  * Attach to a slot.
     563             :  */
     564             : void
     565         306 : logicalrep_worker_attach(int slot)
     566             : {
     567             :     /* Block concurrent access. */
     568         306 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     569             : 
     570             :     Assert(slot >= 0 && slot < max_logical_replication_workers);
     571         306 :     MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
     572             : 
     573         306 :     if (!MyLogicalRepWorker->in_use)
     574             :     {
     575           0 :         LWLockRelease(LogicalRepWorkerLock);
     576           0 :         ereport(ERROR,
     577             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     578             :                  errmsg("logical replication worker slot %d is empty, cannot attach",
     579             :                         slot)));
     580             :     }
     581             : 
     582         306 :     if (MyLogicalRepWorker->proc)
     583             :     {
     584           0 :         LWLockRelease(LogicalRepWorkerLock);
     585           0 :         ereport(ERROR,
     586             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     587             :                  errmsg("logical replication worker slot %d is already used by "
     588             :                         "another worker, cannot attach", slot)));
     589             :     }
     590             : 
     591         306 :     MyLogicalRepWorker->proc = MyProc;
     592         306 :     before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
     593             : 
     594         306 :     LWLockRelease(LogicalRepWorkerLock);
     595         306 : }
     596             : 
     597             : /*
     598             :  * Detach the worker (cleans up the worker info).
     599             :  */
     600             : static void
     601         306 : logicalrep_worker_detach(void)
     602             : {
     603             :     /* Block concurrent access. */
     604         306 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     605             : 
     606         306 :     logicalrep_worker_cleanup(MyLogicalRepWorker);
     607             : 
     608         306 :     LWLockRelease(LogicalRepWorkerLock);
     609         306 : }
     610             : 
     611             : /*
     612             :  * Clean up worker info.
     613             :  */
     614             : static void
     615         306 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
     616             : {
     617             :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
     618             : 
     619         306 :     worker->in_use = false;
     620         306 :     worker->proc = NULL;
     621         306 :     worker->dbid = InvalidOid;
     622         306 :     worker->userid = InvalidOid;
     623         306 :     worker->subid = InvalidOid;
     624         306 :     worker->relid = InvalidOid;
     625         306 : }
     626             : 
     627             : /*
     628             :  * Cleanup function for logical replication launcher.
     629             :  *
     630             :  * Called on logical replication launcher exit.
     631             :  */
     632             : static void
     633         504 : logicalrep_launcher_onexit(int code, Datum arg)
     634             : {
     635         504 :     LogicalRepCtx->launcher_pid = 0;
     636         504 : }
     637             : 
     638             : /*
     639             :  * Cleanup function.
     640             :  *
     641             :  * Called on logical replication worker exit.
     642             :  */
     643             : static void
     644         306 : logicalrep_worker_onexit(int code, Datum arg)
     645             : {
     646             :     /* Disconnect gracefully from the remote side. */
     647         306 :     if (LogRepWorkerWalRcvConn)
     648         296 :         walrcv_disconnect(LogRepWorkerWalRcvConn);
     649             : 
     650         306 :     logicalrep_worker_detach();
     651             : 
     652             :     /* Cleanup fileset used for streaming transactions. */
     653         306 :     if (MyLogicalRepWorker->stream_fileset != NULL)
     654          18 :         FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
     655             : 
     656         306 :     ApplyLauncherWakeup();
     657         306 : }
     658             : 
     659             : /*
     660             :  * Count the number of registered (not necessarily running) sync workers
     661             :  * for a subscription.
     662             :  */
     663             : int
     664         974 : logicalrep_sync_worker_count(Oid subid)
     665             : {
     666             :     int         i;
     667         974 :     int         res = 0;
     668             : 
     669             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     670             : 
     671             :     /* Search for attached worker for a given subscription id. */
     672        4918 :     for (i = 0; i < max_logical_replication_workers; i++)
     673             :     {
     674        3944 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     675             : 
     676        3944 :         if (w->subid == subid && OidIsValid(w->relid))
     677        1196 :             res++;
     678             :     }
     679             : 
     680         974 :     return res;
     681             : }
     682             : 
     683             : /*
     684             :  * ApplyLauncherShmemSize
     685             :  *      Compute space needed for replication launcher shared memory
     686             :  */
     687             : Size
     688        9630 : ApplyLauncherShmemSize(void)
     689             : {
     690             :     Size        size;
     691             : 
     692             :     /*
     693             :      * Need the fixed struct and the array of LogicalRepWorker.
     694             :      */
     695        9630 :     size = sizeof(LogicalRepCtxStruct);
     696        9630 :     size = MAXALIGN(size);
     697        9630 :     size = add_size(size, mul_size(max_logical_replication_workers,
     698             :                                    sizeof(LogicalRepWorker)));
     699        9630 :     return size;
     700             : }
     701             : 
     702             : /*
     703             :  * ApplyLauncherRegister
     704             :  *      Register a background worker running the logical replication launcher.
     705             :  */
     706             : void
     707         950 : ApplyLauncherRegister(void)
     708             : {
     709             :     BackgroundWorker bgw;
     710             : 
     711         950 :     if (max_logical_replication_workers == 0)
     712           0 :         return;
     713             : 
     714         950 :     memset(&bgw, 0, sizeof(bgw));
     715         950 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
     716             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     717         950 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
     718         950 :     snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
     719         950 :     snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
     720         950 :     snprintf(bgw.bgw_name, BGW_MAXLEN,
     721             :              "logical replication launcher");
     722         950 :     snprintf(bgw.bgw_type, BGW_MAXLEN,
     723             :              "logical replication launcher");
     724         950 :     bgw.bgw_restart_time = 5;
     725         950 :     bgw.bgw_notify_pid = 0;
     726         950 :     bgw.bgw_main_arg = (Datum) 0;
     727             : 
     728         950 :     RegisterBackgroundWorker(&bgw);
     729             : }
     730             : 
     731             : /*
     732             :  * ApplyLauncherShmemInit
     733             :  *      Allocate and initialize replication launcher shared memory
     734             :  */
     735             : void
     736        2894 : ApplyLauncherShmemInit(void)
     737             : {
     738             :     bool        found;
     739             : 
     740        2894 :     LogicalRepCtx = (LogicalRepCtxStruct *)
     741        2894 :         ShmemInitStruct("Logical Replication Launcher Data",
     742             :                         ApplyLauncherShmemSize(),
     743             :                         &found);
     744             : 
     745        2894 :     if (!found)
     746             :     {
     747             :         int         slot;
     748             : 
     749        2894 :         memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
     750             : 
     751             :         /* Initialize memory and spin locks for each worker slot. */
     752       14474 :         for (slot = 0; slot < max_logical_replication_workers; slot++)
     753             :         {
     754       11580 :             LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
     755             : 
     756       11580 :             memset(worker, 0, sizeof(LogicalRepWorker));
     757       11580 :             SpinLockInit(&worker->relmutex);
     758             :         }
     759             :     }
     760        2894 : }
     761             : 
     762             : /*
     763             :  * Wakeup the launcher on commit if requested.
     764             :  */
     765             : void
     766      731326 : AtEOXact_ApplyLauncher(bool isCommit)
     767             : {
     768      731326 :     if (isCommit)
     769             :     {
     770      707098 :         if (on_commit_launcher_wakeup)
     771          94 :             ApplyLauncherWakeup();
     772             :     }
     773             : 
     774      731326 :     on_commit_launcher_wakeup = false;
     775      731326 : }
     776             : 
     777             : /*
     778             :  * Request wakeup of the launcher on commit of the transaction.
     779             :  *
     780             :  * This is used to send launcher signal to stop sleeping and process the
     781             :  * subscriptions when current transaction commits. Should be used when new
     782             :  * tuple was added to the pg_subscription catalog.
     783             : */
     784             : void
     785          94 : ApplyLauncherWakeupAtCommit(void)
     786             : {
     787          94 :     if (!on_commit_launcher_wakeup)
     788          94 :         on_commit_launcher_wakeup = true;
     789          94 : }
     790             : 
     791             : static void
     792         400 : ApplyLauncherWakeup(void)
     793             : {
     794         400 :     if (LogicalRepCtx->launcher_pid != 0)
     795         380 :         kill(LogicalRepCtx->launcher_pid, SIGUSR1);
     796         400 : }
     797             : 
     798             : /*
     799             :  * Main loop for the apply launcher process.
     800             :  */
     801             : void
     802         504 : ApplyLauncherMain(Datum main_arg)
     803             : {
     804         504 :     TimestampTz last_start_time = 0;
     805             : 
     806         504 :     ereport(DEBUG1,
     807             :             (errmsg_internal("logical replication launcher started")));
     808             : 
     809         504 :     before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
     810             : 
     811             :     Assert(LogicalRepCtx->launcher_pid == 0);
     812         504 :     LogicalRepCtx->launcher_pid = MyProcPid;
     813             : 
     814             :     /* Establish signal handlers. */
     815         504 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
     816         504 :     pqsignal(SIGTERM, die);
     817         504 :     BackgroundWorkerUnblockSignals();
     818             : 
     819             :     /*
     820             :      * Establish connection to nailed catalogs (we only ever access
     821             :      * pg_subscription).
     822             :      */
     823         504 :     BackgroundWorkerInitializeConnection(NULL, NULL, 0);
     824             : 
     825             :     /* Enter main loop */
     826             :     for (;;)
     827        2142 :     {
     828             :         int         rc;
     829             :         List       *sublist;
     830             :         ListCell   *lc;
     831             :         MemoryContext subctx;
     832             :         MemoryContext oldctx;
     833             :         TimestampTz now;
     834        2646 :         long        wait_time = DEFAULT_NAPTIME_PER_CYCLE;
     835             : 
     836        2646 :         CHECK_FOR_INTERRUPTS();
     837             : 
     838        2646 :         now = GetCurrentTimestamp();
     839             : 
     840             :         /* Limit the start retry to once a wal_retrieve_retry_interval */
     841        2646 :         if (TimestampDifferenceExceeds(last_start_time, now,
     842             :                                        wal_retrieve_retry_interval))
     843             :         {
     844             :             /* Use temporary context for the database list and worker info. */
     845        2506 :             subctx = AllocSetContextCreate(TopMemoryContext,
     846             :                                            "Logical Replication Launcher sublist",
     847             :                                            ALLOCSET_DEFAULT_SIZES);
     848        2506 :             oldctx = MemoryContextSwitchTo(subctx);
     849             : 
     850             :             /* search for subscriptions to start or stop. */
     851        2506 :             sublist = get_subscription_list();
     852             : 
     853             :             /* Start the missing workers for enabled subscriptions. */
     854        2750 :             foreach(lc, sublist)
     855             :             {
     856         244 :                 Subscription *sub = (Subscription *) lfirst(lc);
     857             :                 LogicalRepWorker *w;
     858             : 
     859         244 :                 if (!sub->enabled)
     860          14 :                     continue;
     861             : 
     862         230 :                 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     863         230 :                 w = logicalrep_worker_find(sub->oid, InvalidOid, false);
     864         230 :                 LWLockRelease(LogicalRepWorkerLock);
     865             : 
     866         230 :                 if (w == NULL)
     867             :                 {
     868         106 :                     last_start_time = now;
     869         106 :                     wait_time = wal_retrieve_retry_interval;
     870             : 
     871         106 :                     logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
     872             :                                              sub->owner, InvalidOid);
     873             :                 }
     874             :             }
     875             : 
     876             :             /* Switch back to original memory context. */
     877        2506 :             MemoryContextSwitchTo(oldctx);
     878             :             /* Clean the temporary memory. */
     879        2506 :             MemoryContextDelete(subctx);
     880             :         }
     881             :         else
     882             :         {
     883             :             /*
     884             :              * The wait in previous cycle was interrupted in less than
     885             :              * wal_retrieve_retry_interval since last worker was started, this
     886             :              * usually means crash of the worker, so we should retry in
     887             :              * wal_retrieve_retry_interval again.
     888             :              */
     889         140 :             wait_time = wal_retrieve_retry_interval;
     890             :         }
     891             : 
     892             :         /* Wait for more work. */
     893        2646 :         rc = WaitLatch(MyLatch,
     894             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     895             :                        wait_time,
     896             :                        WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
     897             : 
     898        2640 :         if (rc & WL_LATCH_SET)
     899             :         {
     900        2564 :             ResetLatch(MyLatch);
     901        2564 :             CHECK_FOR_INTERRUPTS();
     902             :         }
     903             : 
     904        2142 :         if (ConfigReloadPending)
     905             :         {
     906          34 :             ConfigReloadPending = false;
     907          34 :             ProcessConfigFile(PGC_SIGHUP);
     908             :         }
     909             :     }
     910             : 
     911             :     /* Not reachable */
     912             : }
     913             : 
     914             : /*
     915             :  * Is current process the logical replication launcher?
     916             :  */
     917             : bool
     918         508 : IsLogicalLauncher(void)
     919             : {
     920         508 :     return LogicalRepCtx->launcher_pid == MyProcPid;
     921             : }
     922             : 
     923             : /*
     924             :  * Returns state of the subscriptions.
     925             :  */
     926             : Datum
     927           2 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
     928             : {
     929             : #define PG_STAT_GET_SUBSCRIPTION_COLS   8
     930           2 :     Oid         subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
     931             :     int         i;
     932           2 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     933             :     TupleDesc   tupdesc;
     934             :     Tuplestorestate *tupstore;
     935             :     MemoryContext per_query_ctx;
     936             :     MemoryContext oldcontext;
     937             : 
     938             :     /* check to see if caller supports us returning a tuplestore */
     939           2 :     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
     940           0 :         ereport(ERROR,
     941             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     942             :                  errmsg("set-valued function called in context that cannot accept a set")));
     943           2 :     if (!(rsinfo->allowedModes & SFRM_Materialize))
     944           0 :         ereport(ERROR,
     945             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     946             :                  errmsg("materialize mode required, but it is not allowed in this context")));
     947             : 
     948             :     /* Build a tuple descriptor for our result type */
     949           2 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
     950           0 :         elog(ERROR, "return type must be a row type");
     951             : 
     952           2 :     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
     953           2 :     oldcontext = MemoryContextSwitchTo(per_query_ctx);
     954             : 
     955           2 :     tupstore = tuplestore_begin_heap(true, false, work_mem);
     956           2 :     rsinfo->returnMode = SFRM_Materialize;
     957           2 :     rsinfo->setResult = tupstore;
     958           2 :     rsinfo->setDesc = tupdesc;
     959             : 
     960           2 :     MemoryContextSwitchTo(oldcontext);
     961             : 
     962             :     /* Make sure we get consistent view of the workers. */
     963           2 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     964             : 
     965          12 :     for (i = 0; i <= max_logical_replication_workers; i++)
     966             :     {
     967             :         /* for each row */
     968             :         Datum       values[PG_STAT_GET_SUBSCRIPTION_COLS];
     969             :         bool        nulls[PG_STAT_GET_SUBSCRIPTION_COLS];
     970             :         int         worker_pid;
     971             :         LogicalRepWorker worker;
     972             : 
     973          10 :         memcpy(&worker, &LogicalRepCtx->workers[i],
     974             :                sizeof(LogicalRepWorker));
     975          10 :         if (!worker.proc || !IsBackendPid(worker.proc->pid))
     976           6 :             continue;
     977             : 
     978           4 :         if (OidIsValid(subid) && worker.subid != subid)
     979           0 :             continue;
     980             : 
     981           4 :         worker_pid = worker.proc->pid;
     982             : 
     983          36 :         MemSet(values, 0, sizeof(values));
     984           8 :         MemSet(nulls, 0, sizeof(nulls));
     985             : 
     986           4 :         values[0] = ObjectIdGetDatum(worker.subid);
     987           4 :         if (OidIsValid(worker.relid))
     988           0 :             values[1] = ObjectIdGetDatum(worker.relid);
     989             :         else
     990           4 :             nulls[1] = true;
     991           4 :         values[2] = Int32GetDatum(worker_pid);
     992           4 :         if (XLogRecPtrIsInvalid(worker.last_lsn))
     993           2 :             nulls[3] = true;
     994             :         else
     995           2 :             values[3] = LSNGetDatum(worker.last_lsn);
     996           4 :         if (worker.last_send_time == 0)
     997           0 :             nulls[4] = true;
     998             :         else
     999           4 :             values[4] = TimestampTzGetDatum(worker.last_send_time);
    1000           4 :         if (worker.last_recv_time == 0)
    1001           0 :             nulls[5] = true;
    1002             :         else
    1003           4 :             values[5] = TimestampTzGetDatum(worker.last_recv_time);
    1004           4 :         if (XLogRecPtrIsInvalid(worker.reply_lsn))
    1005           2 :             nulls[6] = true;
    1006             :         else
    1007           2 :             values[6] = LSNGetDatum(worker.reply_lsn);
    1008           4 :         if (worker.reply_time == 0)
    1009           0 :             nulls[7] = true;
    1010             :         else
    1011           4 :             values[7] = TimestampTzGetDatum(worker.reply_time);
    1012             : 
    1013           4 :         tuplestore_putvalues(tupstore, tupdesc, values, nulls);
    1014             : 
    1015             :         /*
    1016             :          * If only a single subscription was requested, and we found it,
    1017             :          * break.
    1018             :          */
    1019           4 :         if (OidIsValid(subid))
    1020           0 :             break;
    1021             :     }
    1022             : 
    1023           2 :     LWLockRelease(LogicalRepWorkerLock);
    1024             : 
    1025             :     /* clean up and return the tuplestore */
    1026             :     tuplestore_donestoring(tupstore);
    1027             : 
    1028           2 :     return (Datum) 0;
    1029             : }

Generated by: LCOV version 1.14