LCOV - code coverage report
Current view: top level - src/backend/replication/logical - launcher.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13devel Lines: 330 386 85.5 %
Date: 2019-11-13 21:06:57 Functions: 27 27 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * launcher.c
       3             :  *     PostgreSQL logical replication worker launcher process
       4             :  *
       5             :  * Copyright (c) 2016-2019, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/launcher.c
       9             :  *
      10             :  * NOTES
      11             :  *    This module contains the logical replication worker launcher which
      12             :  *    uses the background worker infrastructure to start the logical
      13             :  *    replication workers for every enabled subscription.
      14             :  *
      15             :  *-------------------------------------------------------------------------
      16             :  */
      17             : 
      18             : #include "postgres.h"
      19             : 
      20             : #include "access/heapam.h"
      21             : #include "access/htup.h"
      22             : #include "access/htup_details.h"
      23             : #include "access/tableam.h"
      24             : #include "access/xact.h"
      25             : #include "catalog/pg_subscription.h"
      26             : #include "catalog/pg_subscription_rel.h"
      27             : #include "funcapi.h"
      28             : #include "libpq/pqsignal.h"
      29             : #include "miscadmin.h"
      30             : #include "pgstat.h"
      31             : #include "postmaster/bgworker.h"
      32             : #include "postmaster/fork_process.h"
      33             : #include "postmaster/postmaster.h"
      34             : #include "replication/logicallauncher.h"
      35             : #include "replication/logicalworker.h"
      36             : #include "replication/slot.h"
      37             : #include "replication/walreceiver.h"
      38             : #include "replication/worker_internal.h"
      39             : #include "storage/ipc.h"
      40             : #include "storage/proc.h"
      41             : #include "storage/procarray.h"
      42             : #include "storage/procsignal.h"
      43             : #include "tcop/tcopprot.h"
      44             : #include "utils/memutils.h"
      45             : #include "utils/pg_lsn.h"
      46             : #include "utils/ps_status.h"
      47             : #include "utils/snapmgr.h"
      48             : #include "utils/timeout.h"
      49             : 
      50             : /* max sleep time between cycles (3min) */
      51             : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
      52             : 
      53             : int         max_logical_replication_workers = 4;
      54             : int         max_sync_workers_per_subscription = 2;
      55             : 
      56             : LogicalRepWorker *MyLogicalRepWorker = NULL;
      57             : 
      58             : typedef struct LogicalRepCtxStruct
      59             : {
      60             :     /* Supervisor process. */
      61             :     pid_t       launcher_pid;
      62             : 
      63             :     /* Background workers. */
      64             :     LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
      65             : } LogicalRepCtxStruct;
      66             : 
      67             : LogicalRepCtxStruct *LogicalRepCtx;
      68             : 
      69             : typedef struct LogicalRepWorkerId
      70             : {
      71             :     Oid         subid;
      72             :     Oid         relid;
      73             : } LogicalRepWorkerId;
      74             : 
      75             : typedef struct StopWorkersData
      76             : {
      77             :     int         nestDepth;      /* Sub-transaction nest level */
      78             :     List       *workers;        /* List of LogicalRepWorkerId */
      79             :     struct StopWorkersData *parent; /* This need not be an immediate
      80             :                                      * subtransaction parent */
      81             : } StopWorkersData;
      82             : 
      83             : /*
      84             :  * Stack of StopWorkersData elements. Each stack element contains the workers
      85             :  * to be stopped for that subtransaction.
      86             :  */
      87             : static StopWorkersData *on_commit_stop_workers = NULL;
      88             : 
      89             : static void ApplyLauncherWakeup(void);
      90             : static void logicalrep_launcher_onexit(int code, Datum arg);
      91             : static void logicalrep_worker_onexit(int code, Datum arg);
      92             : static void logicalrep_worker_detach(void);
      93             : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
      94             : 
      95             : /* Flags set by signal handlers */
      96             : static volatile sig_atomic_t got_SIGHUP = false;
      97             : 
      98             : static bool on_commit_launcher_wakeup = false;
      99             : 
     100             : Datum       pg_stat_get_subscription(PG_FUNCTION_ARGS);
     101             : 
     102             : 
     103             : /*
     104             :  * Load the list of subscriptions.
     105             :  *
     106             :  * Only the fields interesting for worker start/stop functions are filled for
     107             :  * each subscription.
     108             :  */
     109             : static List *
     110        2018 : get_subscription_list(void)
     111             : {
     112        2018 :     List       *res = NIL;
     113             :     Relation    rel;
     114             :     TableScanDesc scan;
     115             :     HeapTuple   tup;
     116             :     MemoryContext resultcxt;
     117             : 
     118             :     /* This is the context that we will allocate our output data in */
     119        2018 :     resultcxt = CurrentMemoryContext;
     120             : 
     121             :     /*
     122             :      * Start a transaction so we can access pg_database, and get a snapshot.
     123             :      * We don't have a use for the snapshot itself, but we're interested in
     124             :      * the secondary effect that it sets RecentGlobalXmin.  (This is critical
     125             :      * for anything that reads heap pages, because HOT may decide to prune
     126             :      * them even if the process doesn't attempt to modify any tuples.)
     127             :      */
     128        2018 :     StartTransactionCommand();
     129        2018 :     (void) GetTransactionSnapshot();
     130             : 
     131        2018 :     rel = table_open(SubscriptionRelationId, AccessShareLock);
     132        2018 :     scan = table_beginscan_catalog(rel, 0, NULL);
     133             : 
     134        4150 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     135             :     {
     136         114 :         Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
     137             :         Subscription *sub;
     138             :         MemoryContext oldcxt;
     139             : 
     140             :         /*
     141             :          * Allocate our results in the caller's context, not the
     142             :          * transaction's. We do this inside the loop, and restore the original
     143             :          * context at the end, so that leaky things like heap_getnext() are
     144             :          * not called in a potentially long-lived context.
     145             :          */
     146         114 :         oldcxt = MemoryContextSwitchTo(resultcxt);
     147             : 
     148         114 :         sub = (Subscription *) palloc0(sizeof(Subscription));
     149         114 :         sub->oid = subform->oid;
     150         114 :         sub->dbid = subform->subdbid;
     151         114 :         sub->owner = subform->subowner;
     152         114 :         sub->enabled = subform->subenabled;
     153         114 :         sub->name = pstrdup(NameStr(subform->subname));
     154             :         /* We don't fill fields we are not interested in. */
     155             : 
     156         114 :         res = lappend(res, sub);
     157         114 :         MemoryContextSwitchTo(oldcxt);
     158             :     }
     159             : 
     160        2018 :     table_endscan(scan);
     161        2018 :     table_close(rel, AccessShareLock);
     162             : 
     163        2018 :     CommitTransactionCommand();
     164             : 
     165        2018 :     return res;
     166             : }
     167             : 
     168             : /*
     169             :  * Wait for a background worker to start up and attach to the shmem context.
     170             :  *
     171             :  * This is only needed for cleaning up the shared memory in case the worker
     172             :  * fails to attach.
     173             :  */
     174             : static void
     175         752 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
     176             :                                uint16 generation,
     177             :                                BackgroundWorkerHandle *handle)
     178             : {
     179             :     BgwHandleStatus status;
     180             :     int         rc;
     181             : 
     182             :     for (;;)
     183         638 :     {
     184             :         pid_t       pid;
     185             : 
     186         752 :         CHECK_FOR_INTERRUPTS();
     187             : 
     188         752 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     189             : 
     190             :         /* Worker either died or has started; no need to do anything. */
     191         752 :         if (!worker->in_use || worker->proc)
     192             :         {
     193         114 :             LWLockRelease(LogicalRepWorkerLock);
     194         114 :             return;
     195             :         }
     196             : 
     197         638 :         LWLockRelease(LogicalRepWorkerLock);
     198             : 
     199             :         /* Check if worker has died before attaching, and clean up after it. */
     200         638 :         status = GetBackgroundWorkerPid(handle, &pid);
     201             : 
     202         638 :         if (status == BGWH_STOPPED)
     203             :         {
     204           0 :             LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     205             :             /* Ensure that this was indeed the worker we waited for. */
     206           0 :             if (generation == worker->generation)
     207           0 :                 logicalrep_worker_cleanup(worker);
     208           0 :             LWLockRelease(LogicalRepWorkerLock);
     209           0 :             return;
     210             :         }
     211             : 
     212             :         /*
     213             :          * We need timeout because we generally don't get notified via latch
     214             :          * about the worker attach.  But we don't expect to have to wait long.
     215             :          */
     216         638 :         rc = WaitLatch(MyLatch,
     217             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     218             :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     219             : 
     220         638 :         if (rc & WL_LATCH_SET)
     221             :         {
     222         170 :             ResetLatch(MyLatch);
     223         170 :             CHECK_FOR_INTERRUPTS();
     224             :         }
     225             :     }
     226             : 
     227             :     return;
     228             : }
     229             : 
     230             : /*
     231             :  * Walks the workers array and searches for one that matches given
     232             :  * subscription id and relid.
     233             :  */
     234             : LogicalRepWorker *
     235        1040 : logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
     236             : {
     237             :     int         i;
     238        1040 :     LogicalRepWorker *res = NULL;
     239             : 
     240             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     241             : 
     242             :     /* Search for attached worker for a given subscription id. */
     243        3640 :     for (i = 0; i < max_logical_replication_workers; i++)
     244             :     {
     245        3092 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     246             : 
     247        3584 :         if (w->in_use && w->subid == subid && w->relid == relid &&
     248         564 :             (!only_running || w->proc))
     249             :         {
     250         492 :             res = w;
     251         492 :             break;
     252             :         }
     253             :     }
     254             : 
     255        1040 :     return res;
     256             : }
     257             : 
     258             : /*
     259             :  * Similar to logicalrep_worker_find(), but returns list of all workers for
     260             :  * the subscription, instead just one.
     261             :  */
     262             : List *
     263          28 : logicalrep_workers_find(Oid subid, bool only_running)
     264             : {
     265             :     int         i;
     266          28 :     List       *res = NIL;
     267             : 
     268             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     269             : 
     270             :     /* Search for attached worker for a given subscription id. */
     271         140 :     for (i = 0; i < max_logical_replication_workers; i++)
     272             :     {
     273         112 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     274             : 
     275         112 :         if (w->in_use && w->subid == subid && (!only_running || w->proc))
     276          14 :             res = lappend(res, w);
     277             :     }
     278             : 
     279          28 :     return res;
     280             : }
     281             : 
     282             : /*
     283             :  * Start new apply background worker, if possible.
     284             :  */
     285             : void
     286         120 : logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
     287             :                          Oid relid)
     288             : {
     289             :     BackgroundWorker bgw;
     290             :     BackgroundWorkerHandle *bgw_handle;
     291             :     uint16      generation;
     292             :     int         i;
     293         120 :     int         slot = 0;
     294         120 :     LogicalRepWorker *worker = NULL;
     295             :     int         nsyncworkers;
     296             :     TimestampTz now;
     297             : 
     298         120 :     ereport(DEBUG1,
     299             :             (errmsg("starting logical replication worker for subscription \"%s\"",
     300             :                     subname)));
     301             : 
     302             :     /* Report this after the initial starting message for consistency. */
     303         120 :     if (max_replication_slots == 0)
     304           0 :         ereport(ERROR,
     305             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     306             :                  errmsg("cannot start logical replication workers when max_replication_slots = 0")));
     307             : 
     308             :     /*
     309             :      * We need to do the modification of the shared memory under lock so that
     310             :      * we have consistent view.
     311             :      */
     312         120 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     313             : 
     314             : retry:
     315             :     /* Find unused worker slot. */
     316         254 :     for (i = 0; i < max_logical_replication_workers; i++)
     317             :     {
     318         248 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     319             : 
     320         248 :         if (!w->in_use)
     321             :         {
     322         114 :             worker = w;
     323         114 :             slot = i;
     324         114 :             break;
     325             :         }
     326             :     }
     327             : 
     328         120 :     nsyncworkers = logicalrep_sync_worker_count(subid);
     329             : 
     330         120 :     now = GetCurrentTimestamp();
     331             : 
     332             :     /*
     333             :      * If we didn't find a free slot, try to do garbage collection.  The
     334             :      * reason we do this is because if some worker failed to start up and its
     335             :      * parent has crashed while waiting, the in_use state was never cleared.
     336             :      */
     337         120 :     if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
     338             :     {
     339           6 :         bool        did_cleanup = false;
     340             : 
     341          30 :         for (i = 0; i < max_logical_replication_workers; i++)
     342             :         {
     343          24 :             LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     344             : 
     345             :             /*
     346             :              * If the worker was marked in use but didn't manage to attach in
     347             :              * time, clean it up.
     348             :              */
     349          24 :             if (w->in_use && !w->proc &&
     350           0 :                 TimestampDifferenceExceeds(w->launch_time, now,
     351             :                                            wal_receiver_timeout))
     352             :             {
     353           0 :                 elog(WARNING,
     354             :                      "logical replication worker for subscription %u took too long to start; canceled",
     355             :                      w->subid);
     356             : 
     357           0 :                 logicalrep_worker_cleanup(w);
     358           0 :                 did_cleanup = true;
     359             :             }
     360             :         }
     361             : 
     362           6 :         if (did_cleanup)
     363           0 :             goto retry;
     364             :     }
     365             : 
     366             :     /*
     367             :      * If we reached the sync worker limit per subscription, just exit
     368             :      * silently as we might get here because of an otherwise harmless race
     369             :      * condition.
     370             :      */
     371         120 :     if (nsyncworkers >= max_sync_workers_per_subscription)
     372             :     {
     373           0 :         LWLockRelease(LogicalRepWorkerLock);
     374           0 :         return;
     375             :     }
     376             : 
     377             :     /*
     378             :      * However if there are no more free worker slots, inform user about it
     379             :      * before exiting.
     380             :      */
     381         120 :     if (worker == NULL)
     382             :     {
     383           6 :         LWLockRelease(LogicalRepWorkerLock);
     384           6 :         ereport(WARNING,
     385             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     386             :                  errmsg("out of logical replication worker slots"),
     387             :                  errhint("You might need to increase max_logical_replication_workers.")));
     388           6 :         return;
     389             :     }
     390             : 
     391             :     /* Prepare the worker slot. */
     392         114 :     worker->launch_time = now;
     393         114 :     worker->in_use = true;
     394         114 :     worker->generation++;
     395         114 :     worker->proc = NULL;
     396         114 :     worker->dbid = dbid;
     397         114 :     worker->userid = userid;
     398         114 :     worker->subid = subid;
     399         114 :     worker->relid = relid;
     400         114 :     worker->relstate = SUBREL_STATE_UNKNOWN;
     401         114 :     worker->relstate_lsn = InvalidXLogRecPtr;
     402         114 :     worker->last_lsn = InvalidXLogRecPtr;
     403         114 :     TIMESTAMP_NOBEGIN(worker->last_send_time);
     404         114 :     TIMESTAMP_NOBEGIN(worker->last_recv_time);
     405         114 :     worker->reply_lsn = InvalidXLogRecPtr;
     406         114 :     TIMESTAMP_NOBEGIN(worker->reply_time);
     407             : 
     408             :     /* Before releasing lock, remember generation for future identification. */
     409         114 :     generation = worker->generation;
     410             : 
     411         114 :     LWLockRelease(LogicalRepWorkerLock);
     412             : 
     413             :     /* Register the new dynamic worker. */
     414         114 :     memset(&bgw, 0, sizeof(bgw));
     415         114 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
     416             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     417         114 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
     418         114 :     snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
     419         114 :     snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
     420         114 :     if (OidIsValid(relid))
     421          76 :         snprintf(bgw.bgw_name, BGW_MAXLEN,
     422             :                  "logical replication worker for subscription %u sync %u", subid, relid);
     423             :     else
     424          38 :         snprintf(bgw.bgw_name, BGW_MAXLEN,
     425             :                  "logical replication worker for subscription %u", subid);
     426         114 :     snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker");
     427             : 
     428         114 :     bgw.bgw_restart_time = BGW_NEVER_RESTART;
     429         114 :     bgw.bgw_notify_pid = MyProcPid;
     430         114 :     bgw.bgw_main_arg = Int32GetDatum(slot);
     431             : 
     432         114 :     if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
     433             :     {
     434             :         /* Failed to start worker, so clean up the worker slot. */
     435           0 :         LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     436             :         Assert(generation == worker->generation);
     437           0 :         logicalrep_worker_cleanup(worker);
     438           0 :         LWLockRelease(LogicalRepWorkerLock);
     439             : 
     440           0 :         ereport(WARNING,
     441             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     442             :                  errmsg("out of background worker slots"),
     443             :                  errhint("You might need to increase max_worker_processes.")));
     444           0 :         return;
     445             :     }
     446             : 
     447             :     /* Now wait until it attaches. */
     448         114 :     WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
     449             : }
     450             : 
     451             : /*
     452             :  * Stop the logical replication worker for subid/relid, if any, and wait until
     453             :  * it detaches from the slot.
     454             :  */
     455             : void
     456          24 : logicalrep_worker_stop(Oid subid, Oid relid)
     457             : {
     458             :     LogicalRepWorker *worker;
     459             :     uint16      generation;
     460             : 
     461          24 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     462             : 
     463          24 :     worker = logicalrep_worker_find(subid, relid, false);
     464             : 
     465             :     /* No worker, nothing to do. */
     466          24 :     if (!worker)
     467             :     {
     468          10 :         LWLockRelease(LogicalRepWorkerLock);
     469          10 :         return;
     470             :     }
     471             : 
     472             :     /*
     473             :      * Remember which generation was our worker so we can check if what we see
     474             :      * is still the same one.
     475             :      */
     476          14 :     generation = worker->generation;
     477             : 
     478             :     /*
     479             :      * If we found a worker but it does not have proc set then it is still
     480             :      * starting up; wait for it to finish starting and then kill it.
     481             :      */
     482          28 :     while (worker->in_use && !worker->proc)
     483             :     {
     484             :         int         rc;
     485             : 
     486           0 :         LWLockRelease(LogicalRepWorkerLock);
     487             : 
     488             :         /* Wait a bit --- we don't expect to have to wait long. */
     489           0 :         rc = WaitLatch(MyLatch,
     490             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     491             :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     492             : 
     493           0 :         if (rc & WL_LATCH_SET)
     494             :         {
     495           0 :             ResetLatch(MyLatch);
     496           0 :             CHECK_FOR_INTERRUPTS();
     497             :         }
     498             : 
     499             :         /* Recheck worker status. */
     500           0 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     501             : 
     502             :         /*
     503             :          * Check whether the worker slot is no longer used, which would mean
     504             :          * that the worker has exited, or whether the worker generation is
     505             :          * different, meaning that a different worker has taken the slot.
     506             :          */
     507           0 :         if (!worker->in_use || worker->generation != generation)
     508             :         {
     509           0 :             LWLockRelease(LogicalRepWorkerLock);
     510           0 :             return;
     511             :         }
     512             : 
     513             :         /* Worker has assigned proc, so it has started. */
     514           0 :         if (worker->proc)
     515           0 :             break;
     516             :     }
     517             : 
     518             :     /* Now terminate the worker ... */
     519          14 :     kill(worker->proc->pid, SIGTERM);
     520             : 
     521             :     /* ... and wait for it to die. */
     522             :     for (;;)
     523          16 :     {
     524             :         int         rc;
     525             : 
     526             :         /* is it gone? */
     527          30 :         if (!worker->proc || worker->generation != generation)
     528             :             break;
     529             : 
     530          16 :         LWLockRelease(LogicalRepWorkerLock);
     531             : 
     532             :         /* Wait a bit --- we don't expect to have to wait long. */
     533          16 :         rc = WaitLatch(MyLatch,
     534             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     535             :                        10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
     536             : 
     537          16 :         if (rc & WL_LATCH_SET)
     538             :         {
     539           2 :             ResetLatch(MyLatch);
     540           2 :             CHECK_FOR_INTERRUPTS();
     541             :         }
     542             : 
     543          16 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     544             :     }
     545             : 
     546          14 :     LWLockRelease(LogicalRepWorkerLock);
     547             : }
     548             : 
     549             : /*
     550             :  * Request worker for specified sub/rel to be stopped on commit.
     551             :  */
     552             : void
     553          10 : logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
     554             : {
     555          10 :     int         nestDepth = GetCurrentTransactionNestLevel();
     556             :     LogicalRepWorkerId *wid;
     557             :     MemoryContext oldctx;
     558             : 
     559             :     /* Make sure we store the info in context that survives until commit. */
     560          10 :     oldctx = MemoryContextSwitchTo(TopTransactionContext);
     561             : 
     562             :     /* Check that previous transactions were properly cleaned up. */
     563             :     Assert(on_commit_stop_workers == NULL ||
     564             :            nestDepth >= on_commit_stop_workers->nestDepth);
     565             : 
     566             :     /*
     567             :      * Push a new stack element if we don't already have one for the current
     568             :      * nestDepth.
     569             :      */
     570          18 :     if (on_commit_stop_workers == NULL ||
     571           8 :         nestDepth > on_commit_stop_workers->nestDepth)
     572             :     {
     573           2 :         StopWorkersData *newdata = palloc(sizeof(StopWorkersData));
     574             : 
     575           2 :         newdata->nestDepth = nestDepth;
     576           2 :         newdata->workers = NIL;
     577           2 :         newdata->parent = on_commit_stop_workers;
     578           2 :         on_commit_stop_workers = newdata;
     579             :     }
     580             : 
     581             :     /*
     582             :      * Finally add a new worker into the worker list of the current
     583             :      * subtransaction.
     584             :      */
     585          10 :     wid = palloc(sizeof(LogicalRepWorkerId));
     586          10 :     wid->subid = subid;
     587          10 :     wid->relid = relid;
     588          20 :     on_commit_stop_workers->workers =
     589          10 :         lappend(on_commit_stop_workers->workers, wid);
     590             : 
     591          10 :     MemoryContextSwitchTo(oldctx);
     592          10 : }
     593             : 
     594             : /*
     595             :  * Wake up (using latch) any logical replication worker for specified sub/rel.
     596             :  */
     597             : void
     598          72 : logicalrep_worker_wakeup(Oid subid, Oid relid)
     599             : {
     600             :     LogicalRepWorker *worker;
     601             : 
     602          72 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     603             : 
     604          72 :     worker = logicalrep_worker_find(subid, relid, true);
     605             : 
     606          72 :     if (worker)
     607          72 :         logicalrep_worker_wakeup_ptr(worker);
     608             : 
     609          72 :     LWLockRelease(LogicalRepWorkerLock);
     610          72 : }
     611             : 
     612             : /*
     613             :  * Wake up (using latch) the specified logical replication worker.
     614             :  *
     615             :  * Caller must hold lock, else worker->proc could change under us.
     616             :  */
     617             : void
     618         216 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
     619             : {
     620             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     621             : 
     622         216 :     SetLatch(&worker->proc->procLatch);
     623         216 : }
     624             : 
     625             : /*
     626             :  * Attach to a slot.
     627             :  */
     628             : void
     629         126 : logicalrep_worker_attach(int slot)
     630             : {
     631             :     /* Block concurrent access. */
     632         126 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     633             : 
     634             :     Assert(slot >= 0 && slot < max_logical_replication_workers);
     635         126 :     MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
     636             : 
     637         126 :     if (!MyLogicalRepWorker->in_use)
     638             :     {
     639           0 :         LWLockRelease(LogicalRepWorkerLock);
     640           0 :         ereport(ERROR,
     641             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     642             :                  errmsg("logical replication worker slot %d is empty, cannot attach",
     643             :                         slot)));
     644             :     }
     645             : 
     646         126 :     if (MyLogicalRepWorker->proc)
     647             :     {
     648           0 :         LWLockRelease(LogicalRepWorkerLock);
     649           0 :         ereport(ERROR,
     650             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     651             :                  errmsg("logical replication worker slot %d is already used by "
     652             :                         "another worker, cannot attach", slot)));
     653             :     }
     654             : 
     655         126 :     MyLogicalRepWorker->proc = MyProc;
     656         126 :     before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
     657             : 
     658         126 :     LWLockRelease(LogicalRepWorkerLock);
     659         126 : }
     660             : 
     661             : /*
     662             :  * Detach the worker (cleans up the worker info).
     663             :  */
     664             : static void
     665         126 : logicalrep_worker_detach(void)
     666             : {
     667             :     /* Block concurrent access. */
     668         126 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     669             : 
     670         126 :     logicalrep_worker_cleanup(MyLogicalRepWorker);
     671             : 
     672         126 :     LWLockRelease(LogicalRepWorkerLock);
     673         126 : }
     674             : 
     675             : /*
     676             :  * Clean up worker info.
     677             :  */
     678             : static void
     679         126 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
     680             : {
     681             :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
     682             : 
     683         126 :     worker->in_use = false;
     684         126 :     worker->proc = NULL;
     685         126 :     worker->dbid = InvalidOid;
     686         126 :     worker->userid = InvalidOid;
     687         126 :     worker->subid = InvalidOid;
     688         126 :     worker->relid = InvalidOid;
     689         126 : }
     690             : 
     691             : /*
     692             :  * Cleanup function for logical replication launcher.
     693             :  *
     694             :  * Called on logical replication launcher exit.
     695             :  */
     696             : static void
     697         346 : logicalrep_launcher_onexit(int code, Datum arg)
     698             : {
     699         346 :     LogicalRepCtx->launcher_pid = 0;
     700         346 : }
     701             : 
     702             : /*
     703             :  * Cleanup function.
     704             :  *
     705             :  * Called on logical replication worker exit.
     706             :  */
     707             : static void
     708         126 : logicalrep_worker_onexit(int code, Datum arg)
     709             : {
     710             :     /* Disconnect gracefully from the remote side. */
     711         126 :     if (wrconn)
     712         120 :         walrcv_disconnect(wrconn);
     713             : 
     714         126 :     logicalrep_worker_detach();
     715             : 
     716         126 :     ApplyLauncherWakeup();
     717         126 : }
     718             : 
     719             : /* SIGHUP: set flag to reload configuration at next convenient time */
     720             : static void
     721           8 : logicalrep_launcher_sighup(SIGNAL_ARGS)
     722             : {
     723           8 :     int         save_errno = errno;
     724             : 
     725           8 :     got_SIGHUP = true;
     726             : 
     727             :     /* Waken anything waiting on the process latch */
     728           8 :     SetLatch(MyLatch);
     729             : 
     730           8 :     errno = save_errno;
     731           8 : }
     732             : 
     733             : /*
     734             :  * Count the number of registered (not necessarily running) sync workers
     735             :  * for a subscription.
     736             :  */
     737             : int
     738         620 : logicalrep_sync_worker_count(Oid subid)
     739             : {
     740             :     int         i;
     741         620 :     int         res = 0;
     742             : 
     743             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     744             : 
     745             :     /* Search for attached worker for a given subscription id. */
     746        3100 :     for (i = 0; i < max_logical_replication_workers; i++)
     747             :     {
     748        2480 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     749             : 
     750        2480 :         if (w->subid == subid && OidIsValid(w->relid))
     751         892 :             res++;
     752             :     }
     753             : 
     754         620 :     return res;
     755             : }
     756             : 
     757             : /*
     758             :  * ApplyLauncherShmemSize
     759             :  *      Compute space needed for replication launcher shared memory
     760             :  */
     761             : Size
     762        5674 : ApplyLauncherShmemSize(void)
     763             : {
     764             :     Size        size;
     765             : 
     766             :     /*
     767             :      * Need the fixed struct and the array of LogicalRepWorker.
     768             :      */
     769        5674 :     size = sizeof(LogicalRepCtxStruct);
     770        5674 :     size = MAXALIGN(size);
     771        5674 :     size = add_size(size, mul_size(max_logical_replication_workers,
     772             :                                    sizeof(LogicalRepWorker)));
     773        5674 :     return size;
     774             : }
     775             : 
     776             : /*
     777             :  * ApplyLauncherRegister
     778             :  *      Register a background worker running the logical replication launcher.
     779             :  */
     780             : void
     781         582 : ApplyLauncherRegister(void)
     782             : {
     783             :     BackgroundWorker bgw;
     784             : 
     785         582 :     if (max_logical_replication_workers == 0)
     786           0 :         return;
     787             : 
     788         582 :     memset(&bgw, 0, sizeof(bgw));
     789         582 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
     790             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     791         582 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
     792         582 :     snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
     793         582 :     snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
     794         582 :     snprintf(bgw.bgw_name, BGW_MAXLEN,
     795             :              "logical replication launcher");
     796         582 :     snprintf(bgw.bgw_type, BGW_MAXLEN,
     797             :              "logical replication launcher");
     798         582 :     bgw.bgw_restart_time = 5;
     799         582 :     bgw.bgw_notify_pid = 0;
     800         582 :     bgw.bgw_main_arg = (Datum) 0;
     801             : 
     802         582 :     RegisterBackgroundWorker(&bgw);
     803             : }
     804             : 
     805             : /*
     806             :  * ApplyLauncherShmemInit
     807             :  *      Allocate and initialize replication launcher shared memory
     808             :  */
     809             : void
     810        1890 : ApplyLauncherShmemInit(void)
     811             : {
     812             :     bool        found;
     813             : 
     814        1890 :     LogicalRepCtx = (LogicalRepCtxStruct *)
     815        1890 :         ShmemInitStruct("Logical Replication Launcher Data",
     816             :                         ApplyLauncherShmemSize(),
     817             :                         &found);
     818             : 
     819        1890 :     if (!found)
     820             :     {
     821             :         int         slot;
     822             : 
     823        1890 :         memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
     824             : 
     825             :         /* Initialize memory and spin locks for each worker slot. */
     826        9450 :         for (slot = 0; slot < max_logical_replication_workers; slot++)
     827             :         {
     828        7560 :             LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
     829             : 
     830        7560 :             memset(worker, 0, sizeof(LogicalRepWorker));
     831        7560 :             SpinLockInit(&worker->relmutex);
     832             :         }
     833             :     }
     834        1890 : }
     835             : 
     836             : /*
     837             :  * Check whether current transaction has manipulated logical replication
     838             :  * workers.
     839             :  */
     840             : bool
     841          84 : XactManipulatesLogicalReplicationWorkers(void)
     842             : {
     843          84 :     return (on_commit_stop_workers != NULL);
     844             : }
     845             : 
     846             : /*
     847             :  * Wakeup the launcher on commit if requested.
     848             :  */
     849             : void
     850      458506 : AtEOXact_ApplyLauncher(bool isCommit)
     851             : {
     852             : 
     853             :     Assert(on_commit_stop_workers == NULL ||
     854             :            (on_commit_stop_workers->nestDepth == 1 &&
     855             :             on_commit_stop_workers->parent == NULL));
     856             : 
     857      458506 :     if (isCommit)
     858             :     {
     859             :         ListCell   *lc;
     860             : 
     861      439150 :         if (on_commit_stop_workers != NULL)
     862             :         {
     863           2 :             List       *workers = on_commit_stop_workers->workers;
     864             : 
     865          12 :             foreach(lc, workers)
     866             :             {
     867          10 :                 LogicalRepWorkerId *wid = lfirst(lc);
     868             : 
     869          10 :                 logicalrep_worker_stop(wid->subid, wid->relid);
     870             :             }
     871             :         }
     872             : 
     873      439150 :         if (on_commit_launcher_wakeup)
     874          38 :             ApplyLauncherWakeup();
     875             :     }
     876             : 
     877             :     /*
     878             :      * No need to pfree on_commit_stop_workers.  It was allocated in
     879             :      * transaction memory context, which is going to be cleaned soon.
     880             :      */
     881      458506 :     on_commit_stop_workers = NULL;
     882      458506 :     on_commit_launcher_wakeup = false;
     883      458506 : }
     884             : 
     885             : /*
     886             :  * On commit, merge the current on_commit_stop_workers list into the
     887             :  * immediate parent, if present.
     888             :  * On rollback, discard the current on_commit_stop_workers list.
     889             :  * Pop out the stack.
     890             :  */
     891             : void
     892        7360 : AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
     893             : {
     894             :     StopWorkersData *parent;
     895             : 
     896             :     /* Exit immediately if there's no work to do at this level. */
     897        7360 :     if (on_commit_stop_workers == NULL ||
     898           0 :         on_commit_stop_workers->nestDepth < nestDepth)
     899        7360 :         return;
     900             : 
     901             :     Assert(on_commit_stop_workers->nestDepth == nestDepth);
     902             : 
     903           0 :     parent = on_commit_stop_workers->parent;
     904             : 
     905           0 :     if (isCommit)
     906             :     {
     907             :         /*
     908             :          * If the upper stack element is not an immediate parent
     909             :          * subtransaction, just decrement the notional nesting depth without
     910             :          * doing any real work.  Else, we need to merge the current workers
     911             :          * list into the parent.
     912             :          */
     913           0 :         if (!parent || parent->nestDepth < nestDepth - 1)
     914             :         {
     915           0 :             on_commit_stop_workers->nestDepth--;
     916           0 :             return;
     917             :         }
     918             : 
     919           0 :         parent->workers =
     920           0 :             list_concat(parent->workers, on_commit_stop_workers->workers);
     921             :     }
     922             :     else
     923             :     {
     924             :         /*
     925             :          * Abandon everything that was done at this nesting level.  Explicitly
     926             :          * free memory to avoid a transaction-lifespan leak.
     927             :          */
     928           0 :         list_free_deep(on_commit_stop_workers->workers);
     929             :     }
     930             : 
     931             :     /*
     932             :      * We have taken care of the current subtransaction workers list for both
     933             :      * abort or commit. So we are ready to pop the stack.
     934             :      */
     935           0 :     pfree(on_commit_stop_workers);
     936           0 :     on_commit_stop_workers = parent;
     937             : }
     938             : 
     939             : /*
     940             :  * Request wakeup of the launcher on commit of the transaction.
     941             :  *
     942             :  * This is used to send launcher signal to stop sleeping and process the
     943             :  * subscriptions when current transaction commits. Should be used when new
     944             :  * tuple was added to the pg_subscription catalog.
     945             : */
     946             : void
     947          38 : ApplyLauncherWakeupAtCommit(void)
     948             : {
     949          38 :     if (!on_commit_launcher_wakeup)
     950          38 :         on_commit_launcher_wakeup = true;
     951          38 : }
     952             : 
     953             : static void
     954         164 : ApplyLauncherWakeup(void)
     955             : {
     956         164 :     if (LogicalRepCtx->launcher_pid != 0)
     957         152 :         kill(LogicalRepCtx->launcher_pid, SIGUSR1);
     958         164 : }
     959             : 
     960             : /*
     961             :  * Main loop for the apply launcher process.
     962             :  */
     963             : void
     964         346 : ApplyLauncherMain(Datum main_arg)
     965             : {
     966         346 :     TimestampTz last_start_time = 0;
     967             : 
     968         346 :     ereport(DEBUG1,
     969             :             (errmsg("logical replication launcher started")));
     970             : 
     971         346 :     before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
     972             : 
     973             :     Assert(LogicalRepCtx->launcher_pid == 0);
     974         346 :     LogicalRepCtx->launcher_pid = MyProcPid;
     975             : 
     976             :     /* Establish signal handlers. */
     977         346 :     pqsignal(SIGHUP, logicalrep_launcher_sighup);
     978         346 :     pqsignal(SIGTERM, die);
     979         346 :     BackgroundWorkerUnblockSignals();
     980             : 
     981             :     /*
     982             :      * Establish connection to nailed catalogs (we only ever access
     983             :      * pg_subscription).
     984             :      */
     985         346 :     BackgroundWorkerInitializeConnection(NULL, NULL, 0);
     986             : 
     987             :     /* Enter main loop */
     988             :     for (;;)
     989        1716 :     {
     990             :         int         rc;
     991             :         List       *sublist;
     992             :         ListCell   *lc;
     993             :         MemoryContext subctx;
     994             :         MemoryContext oldctx;
     995             :         TimestampTz now;
     996        2062 :         long        wait_time = DEFAULT_NAPTIME_PER_CYCLE;
     997             : 
     998        2062 :         CHECK_FOR_INTERRUPTS();
     999             : 
    1000        2060 :         now = GetCurrentTimestamp();
    1001             : 
    1002             :         /* Limit the start retry to once a wal_retrieve_retry_interval */
    1003        2060 :         if (TimestampDifferenceExceeds(last_start_time, now,
    1004             :                                        wal_retrieve_retry_interval))
    1005             :         {
    1006             :             /* Use temporary context for the database list and worker info. */
    1007        2018 :             subctx = AllocSetContextCreate(TopMemoryContext,
    1008             :                                            "Logical Replication Launcher sublist",
    1009             :                                            ALLOCSET_DEFAULT_SIZES);
    1010        2018 :             oldctx = MemoryContextSwitchTo(subctx);
    1011             : 
    1012             :             /* search for subscriptions to start or stop. */
    1013        2018 :             sublist = get_subscription_list();
    1014             : 
    1015             :             /* Start the missing workers for enabled subscriptions. */
    1016        2132 :             foreach(lc, sublist)
    1017             :             {
    1018         114 :                 Subscription *sub = (Subscription *) lfirst(lc);
    1019             :                 LogicalRepWorker *w;
    1020             : 
    1021         114 :                 if (!sub->enabled)
    1022           6 :                     continue;
    1023             : 
    1024         108 :                 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1025         108 :                 w = logicalrep_worker_find(sub->oid, InvalidOid, false);
    1026         108 :                 LWLockRelease(LogicalRepWorkerLock);
    1027             : 
    1028         108 :                 if (w == NULL)
    1029             :                 {
    1030          38 :                     last_start_time = now;
    1031          38 :                     wait_time = wal_retrieve_retry_interval;
    1032             : 
    1033          38 :                     logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
    1034             :                                              sub->owner, InvalidOid);
    1035             :                 }
    1036             :             }
    1037             : 
    1038             :             /* Switch back to original memory context. */
    1039        2018 :             MemoryContextSwitchTo(oldctx);
    1040             :             /* Clean the temporary memory. */
    1041        2018 :             MemoryContextDelete(subctx);
    1042             :         }
    1043             :         else
    1044             :         {
    1045             :             /*
    1046             :              * The wait in previous cycle was interrupted in less than
    1047             :              * wal_retrieve_retry_interval since last worker was started, this
    1048             :              * usually means crash of the worker, so we should retry in
    1049             :              * wal_retrieve_retry_interval again.
    1050             :              */
    1051          42 :             wait_time = wal_retrieve_retry_interval;
    1052             :         }
    1053             : 
    1054             :         /* Wait for more work. */
    1055        2060 :         rc = WaitLatch(MyLatch,
    1056             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1057             :                        wait_time,
    1058             :                        WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
    1059             : 
    1060        2054 :         if (rc & WL_LATCH_SET)
    1061             :         {
    1062        2030 :             ResetLatch(MyLatch);
    1063        2030 :             CHECK_FOR_INTERRUPTS();
    1064             :         }
    1065             : 
    1066        1716 :         if (got_SIGHUP)
    1067             :         {
    1068           8 :             got_SIGHUP = false;
    1069           8 :             ProcessConfigFile(PGC_SIGHUP);
    1070             :         }
    1071             :     }
    1072             : 
    1073             :     /* Not reachable */
    1074             : }
    1075             : 
    1076             : /*
    1077             :  * Is current process the logical replication launcher?
    1078             :  */
    1079             : bool
    1080         348 : IsLogicalLauncher(void)
    1081             : {
    1082         348 :     return LogicalRepCtx->launcher_pid == MyProcPid;
    1083             : }
    1084             : 
    1085             : /*
    1086             :  * Returns state of the subscriptions.
    1087             :  */
    1088             : Datum
    1089           2 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
    1090             : {
    1091             : #define PG_STAT_GET_SUBSCRIPTION_COLS   8
    1092           2 :     Oid         subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
    1093             :     int         i;
    1094           2 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1095             :     TupleDesc   tupdesc;
    1096             :     Tuplestorestate *tupstore;
    1097             :     MemoryContext per_query_ctx;
    1098             :     MemoryContext oldcontext;
    1099             : 
    1100             :     /* check to see if caller supports us returning a tuplestore */
    1101           2 :     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
    1102           0 :         ereport(ERROR,
    1103             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1104             :                  errmsg("set-valued function called in context that cannot accept a set")));
    1105           2 :     if (!(rsinfo->allowedModes & SFRM_Materialize))
    1106           0 :         ereport(ERROR,
    1107             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1108             :                  errmsg("materialize mode required, but it is not " \
    1109             :                         "allowed in this context")));
    1110             : 
    1111             :     /* Build a tuple descriptor for our result type */
    1112           2 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
    1113           0 :         elog(ERROR, "return type must be a row type");
    1114             : 
    1115           2 :     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
    1116           2 :     oldcontext = MemoryContextSwitchTo(per_query_ctx);
    1117             : 
    1118           2 :     tupstore = tuplestore_begin_heap(true, false, work_mem);
    1119           2 :     rsinfo->returnMode = SFRM_Materialize;
    1120           2 :     rsinfo->setResult = tupstore;
    1121           2 :     rsinfo->setDesc = tupdesc;
    1122             : 
    1123           2 :     MemoryContextSwitchTo(oldcontext);
    1124             : 
    1125             :     /* Make sure we get consistent view of the workers. */
    1126           2 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1127             : 
    1128          12 :     for (i = 0; i <= max_logical_replication_workers; i++)
    1129             :     {
    1130             :         /* for each row */
    1131             :         Datum       values[PG_STAT_GET_SUBSCRIPTION_COLS];
    1132             :         bool        nulls[PG_STAT_GET_SUBSCRIPTION_COLS];
    1133             :         int         worker_pid;
    1134             :         LogicalRepWorker worker;
    1135             : 
    1136          10 :         memcpy(&worker, &LogicalRepCtx->workers[i],
    1137             :                sizeof(LogicalRepWorker));
    1138          10 :         if (!worker.proc || !IsBackendPid(worker.proc->pid))
    1139          12 :             continue;
    1140             : 
    1141           4 :         if (OidIsValid(subid) && worker.subid != subid)
    1142           0 :             continue;
    1143             : 
    1144           4 :         worker_pid = worker.proc->pid;
    1145             : 
    1146           4 :         MemSet(values, 0, sizeof(values));
    1147           4 :         MemSet(nulls, 0, sizeof(nulls));
    1148             : 
    1149           4 :         values[0] = ObjectIdGetDatum(worker.subid);
    1150           4 :         if (OidIsValid(worker.relid))
    1151           0 :             values[1] = ObjectIdGetDatum(worker.relid);
    1152             :         else
    1153           4 :             nulls[1] = true;
    1154           4 :         values[2] = Int32GetDatum(worker_pid);
    1155           4 :         if (XLogRecPtrIsInvalid(worker.last_lsn))
    1156           0 :             nulls[3] = true;
    1157             :         else
    1158           4 :             values[3] = LSNGetDatum(worker.last_lsn);
    1159           4 :         if (worker.last_send_time == 0)
    1160           0 :             nulls[4] = true;
    1161             :         else
    1162           4 :             values[4] = TimestampTzGetDatum(worker.last_send_time);
    1163           4 :         if (worker.last_recv_time == 0)
    1164           0 :             nulls[5] = true;
    1165             :         else
    1166           4 :             values[5] = TimestampTzGetDatum(worker.last_recv_time);
    1167           4 :         if (XLogRecPtrIsInvalid(worker.reply_lsn))
    1168           0 :             nulls[6] = true;
    1169             :         else
    1170           4 :             values[6] = LSNGetDatum(worker.reply_lsn);
    1171           4 :         if (worker.reply_time == 0)
    1172           0 :             nulls[7] = true;
    1173             :         else
    1174           4 :             values[7] = TimestampTzGetDatum(worker.reply_time);
    1175             : 
    1176           4 :         tuplestore_putvalues(tupstore, tupdesc, values, nulls);
    1177             : 
    1178             :         /*
    1179             :          * If only a single subscription was requested, and we found it,
    1180             :          * break.
    1181             :          */
    1182           4 :         if (OidIsValid(subid))
    1183           0 :             break;
    1184             :     }
    1185             : 
    1186           2 :     LWLockRelease(LogicalRepWorkerLock);
    1187             : 
    1188             :     /* clean up and return the tuplestore */
    1189             :     tuplestore_donestoring(tupstore);
    1190             : 
    1191           2 :     return (Datum) 0;
    1192             : }

Generated by: LCOV version 1.13