LCOV - code coverage report
Current view: top level - src/backend/replication/logical - launcher.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 324 381 85.0 %
Date: 2020-05-31 22:07:05 Functions: 26 26 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * launcher.c
       3             :  *     PostgreSQL logical replication worker launcher process
       4             :  *
       5             :  * Copyright (c) 2016-2020, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/launcher.c
       9             :  *
      10             :  * NOTES
      11             :  *    This module contains the logical replication worker launcher which
      12             :  *    uses the background worker infrastructure to start the logical
      13             :  *    replication workers for every enabled subscription.
      14             :  *
      15             :  *-------------------------------------------------------------------------
      16             :  */
      17             : 
      18             : #include "postgres.h"
      19             : 
      20             : #include "access/heapam.h"
      21             : #include "access/htup.h"
      22             : #include "access/htup_details.h"
      23             : #include "access/tableam.h"
      24             : #include "access/xact.h"
      25             : #include "catalog/pg_subscription.h"
      26             : #include "catalog/pg_subscription_rel.h"
      27             : #include "funcapi.h"
      28             : #include "libpq/pqsignal.h"
      29             : #include "miscadmin.h"
      30             : #include "pgstat.h"
      31             : #include "postmaster/bgworker.h"
      32             : #include "postmaster/fork_process.h"
      33             : #include "postmaster/interrupt.h"
      34             : #include "postmaster/postmaster.h"
      35             : #include "replication/logicallauncher.h"
      36             : #include "replication/logicalworker.h"
      37             : #include "replication/slot.h"
      38             : #include "replication/walreceiver.h"
      39             : #include "replication/worker_internal.h"
      40             : #include "storage/ipc.h"
      41             : #include "storage/proc.h"
      42             : #include "storage/procarray.h"
      43             : #include "storage/procsignal.h"
      44             : #include "tcop/tcopprot.h"
      45             : #include "utils/memutils.h"
      46             : #include "utils/pg_lsn.h"
      47             : #include "utils/ps_status.h"
      48             : #include "utils/snapmgr.h"
      49             : #include "utils/timeout.h"
      50             : 
      51             : /* max sleep time between cycles (3min) */
      52             : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
      53             : 
      54             : int         max_logical_replication_workers = 4;
      55             : int         max_sync_workers_per_subscription = 2;
      56             : 
      57             : LogicalRepWorker *MyLogicalRepWorker = NULL;
      58             : 
      59             : typedef struct LogicalRepCtxStruct
      60             : {
      61             :     /* Supervisor process. */
      62             :     pid_t       launcher_pid;
      63             : 
      64             :     /* Background workers. */
      65             :     LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
      66             : } LogicalRepCtxStruct;
      67             : 
      68             : LogicalRepCtxStruct *LogicalRepCtx;
      69             : 
      70             : typedef struct LogicalRepWorkerId
      71             : {
      72             :     Oid         subid;
      73             :     Oid         relid;
      74             : } LogicalRepWorkerId;
      75             : 
      76             : typedef struct StopWorkersData
      77             : {
      78             :     int         nestDepth;      /* Sub-transaction nest level */
      79             :     List       *workers;        /* List of LogicalRepWorkerId */
      80             :     struct StopWorkersData *parent; /* This need not be an immediate
      81             :                                      * subtransaction parent */
      82             : } StopWorkersData;
      83             : 
      84             : /*
      85             :  * Stack of StopWorkersData elements. Each stack element contains the workers
      86             :  * to be stopped for that subtransaction.
      87             :  */
      88             : static StopWorkersData *on_commit_stop_workers = NULL;
      89             : 
      90             : static void ApplyLauncherWakeup(void);
      91             : static void logicalrep_launcher_onexit(int code, Datum arg);
      92             : static void logicalrep_worker_onexit(int code, Datum arg);
      93             : static void logicalrep_worker_detach(void);
      94             : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
      95             : 
      96             : static bool on_commit_launcher_wakeup = false;
      97             : 
      98             : Datum       pg_stat_get_subscription(PG_FUNCTION_ARGS);
      99             : 
     100             : 
     101             : /*
     102             :  * Load the list of subscriptions.
     103             :  *
     104             :  * Only the fields interesting for worker start/stop functions are filled for
     105             :  * each subscription.
     106             :  */
     107             : static List *
     108        2060 : get_subscription_list(void)
     109             : {
     110        2060 :     List       *res = NIL;
     111             :     Relation    rel;
     112             :     TableScanDesc scan;
     113             :     HeapTuple   tup;
     114             :     MemoryContext resultcxt;
     115             : 
     116             :     /* This is the context that we will allocate our output data in */
     117        2060 :     resultcxt = CurrentMemoryContext;
     118             : 
     119             :     /*
     120             :      * Start a transaction so we can access pg_database, and get a snapshot.
     121             :      * We don't have a use for the snapshot itself, but we're interested in
     122             :      * the secondary effect that it sets RecentGlobalXmin.  (This is critical
     123             :      * for anything that reads heap pages, because HOT may decide to prune
     124             :      * them even if the process doesn't attempt to modify any tuples.)
     125             :      */
     126        2060 :     StartTransactionCommand();
     127        2060 :     (void) GetTransactionSnapshot();
     128             : 
     129        2060 :     rel = table_open(SubscriptionRelationId, AccessShareLock);
     130        2060 :     scan = table_beginscan_catalog(rel, 0, NULL);
     131             : 
     132        2174 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     133             :     {
     134         114 :         Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
     135             :         Subscription *sub;
     136             :         MemoryContext oldcxt;
     137             : 
     138             :         /*
     139             :          * Allocate our results in the caller's context, not the
     140             :          * transaction's. We do this inside the loop, and restore the original
     141             :          * context at the end, so that leaky things like heap_getnext() are
     142             :          * not called in a potentially long-lived context.
     143             :          */
     144         114 :         oldcxt = MemoryContextSwitchTo(resultcxt);
     145             : 
     146         114 :         sub = (Subscription *) palloc0(sizeof(Subscription));
     147         114 :         sub->oid = subform->oid;
     148         114 :         sub->dbid = subform->subdbid;
     149         114 :         sub->owner = subform->subowner;
     150         114 :         sub->enabled = subform->subenabled;
     151         114 :         sub->name = pstrdup(NameStr(subform->subname));
     152             :         /* We don't fill fields we are not interested in. */
     153             : 
     154         114 :         res = lappend(res, sub);
     155         114 :         MemoryContextSwitchTo(oldcxt);
     156             :     }
     157             : 
     158        2060 :     table_endscan(scan);
     159        2060 :     table_close(rel, AccessShareLock);
     160             : 
     161        2060 :     CommitTransactionCommand();
     162             : 
     163        2060 :     return res;
     164             : }
     165             : 
     166             : /*
     167             :  * Wait for a background worker to start up and attach to the shmem context.
     168             :  *
     169             :  * This is only needed for cleaning up the shared memory in case the worker
     170             :  * fails to attach.
     171             :  */
     172             : static void
     173         860 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
     174             :                                uint16 generation,
     175             :                                BackgroundWorkerHandle *handle)
     176             : {
     177             :     BgwHandleStatus status;
     178             :     int         rc;
     179             : 
     180             :     for (;;)
     181         720 :     {
     182             :         pid_t       pid;
     183             : 
     184         860 :         CHECK_FOR_INTERRUPTS();
     185             : 
     186         860 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     187             : 
     188             :         /* Worker either died or has started; no need to do anything. */
     189         860 :         if (!worker->in_use || worker->proc)
     190             :         {
     191         140 :             LWLockRelease(LogicalRepWorkerLock);
     192         140 :             return;
     193             :         }
     194             : 
     195         720 :         LWLockRelease(LogicalRepWorkerLock);
     196             : 
     197             :         /* Check if worker has died before attaching, and clean up after it. */
     198         720 :         status = GetBackgroundWorkerPid(handle, &pid);
     199             : 
     200         720 :         if (status == BGWH_STOPPED)
     201             :         {
     202           0 :             LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     203             :             /* Ensure that this was indeed the worker we waited for. */
     204           0 :             if (generation == worker->generation)
     205           0 :                 logicalrep_worker_cleanup(worker);
     206           0 :             LWLockRelease(LogicalRepWorkerLock);
     207           0 :             return;
     208             :         }
     209             : 
     210             :         /*
     211             :          * We need timeout because we generally don't get notified via latch
     212             :          * about the worker attach.  But we don't expect to have to wait long.
     213             :          */
     214         720 :         rc = WaitLatch(MyLatch,
     215             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     216             :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     217             : 
     218         720 :         if (rc & WL_LATCH_SET)
     219             :         {
     220         210 :             ResetLatch(MyLatch);
     221         210 :             CHECK_FOR_INTERRUPTS();
     222             :         }
     223             :     }
     224             : }
     225             : 
     226             : /*
     227             :  * Walks the workers array and searches for one that matches given
     228             :  * subscription id and relid.
     229             :  */
     230             : LogicalRepWorker *
     231        1284 : logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
     232             : {
     233             :     int         i;
     234        1284 :     LogicalRepWorker *res = NULL;
     235             : 
     236             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     237             : 
     238             :     /* Search for attached worker for a given subscription id. */
     239        4376 :     for (i = 0; i < max_logical_replication_workers; i++)
     240             :     {
     241        3742 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     242             : 
     243        3742 :         if (w->in_use && w->subid == subid && w->relid == relid &&
     244         650 :             (!only_running || w->proc))
     245             :         {
     246         650 :             res = w;
     247         650 :             break;
     248             :         }
     249             :     }
     250             : 
     251        1284 :     return res;
     252             : }
     253             : 
     254             : /*
     255             :  * Similar to logicalrep_worker_find(), but returns list of all workers for
     256             :  * the subscription, instead just one.
     257             :  */
     258             : List *
     259          30 : logicalrep_workers_find(Oid subid, bool only_running)
     260             : {
     261             :     int         i;
     262          30 :     List       *res = NIL;
     263             : 
     264             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     265             : 
     266             :     /* Search for attached worker for a given subscription id. */
     267         150 :     for (i = 0; i < max_logical_replication_workers; i++)
     268             :     {
     269         120 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     270             : 
     271         120 :         if (w->in_use && w->subid == subid && (!only_running || w->proc))
     272          16 :             res = lappend(res, w);
     273             :     }
     274             : 
     275          30 :     return res;
     276             : }
     277             : 
     278             : /*
     279             :  * Start new apply background worker, if possible.
     280             :  */
     281             : void
     282         146 : logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
     283             :                          Oid relid)
     284             : {
     285             :     BackgroundWorker bgw;
     286             :     BackgroundWorkerHandle *bgw_handle;
     287             :     uint16      generation;
     288             :     int         i;
     289         146 :     int         slot = 0;
     290         146 :     LogicalRepWorker *worker = NULL;
     291             :     int         nsyncworkers;
     292             :     TimestampTz now;
     293             : 
     294         146 :     ereport(DEBUG1,
     295             :             (errmsg("starting logical replication worker for subscription \"%s\"",
     296             :                     subname)));
     297             : 
     298             :     /* Report this after the initial starting message for consistency. */
     299         146 :     if (max_replication_slots == 0)
     300           0 :         ereport(ERROR,
     301             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     302             :                  errmsg("cannot start logical replication workers when max_replication_slots = 0")));
     303             : 
     304             :     /*
     305             :      * We need to do the modification of the shared memory under lock so that
     306             :      * we have consistent view.
     307             :      */
     308         146 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     309             : 
     310         146 : retry:
     311             :     /* Find unused worker slot. */
     312         316 :     for (i = 0; i < max_logical_replication_workers; i++)
     313             :     {
     314         310 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     315             : 
     316         310 :         if (!w->in_use)
     317             :         {
     318         140 :             worker = w;
     319         140 :             slot = i;
     320         140 :             break;
     321             :         }
     322             :     }
     323             : 
     324         146 :     nsyncworkers = logicalrep_sync_worker_count(subid);
     325             : 
     326         146 :     now = GetCurrentTimestamp();
     327             : 
     328             :     /*
     329             :      * If we didn't find a free slot, try to do garbage collection.  The
     330             :      * reason we do this is because if some worker failed to start up and its
     331             :      * parent has crashed while waiting, the in_use state was never cleared.
     332             :      */
     333         146 :     if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
     334             :     {
     335           6 :         bool        did_cleanup = false;
     336             : 
     337          30 :         for (i = 0; i < max_logical_replication_workers; i++)
     338             :         {
     339          24 :             LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     340             : 
     341             :             /*
     342             :              * If the worker was marked in use but didn't manage to attach in
     343             :              * time, clean it up.
     344             :              */
     345          24 :             if (w->in_use && !w->proc &&
     346           0 :                 TimestampDifferenceExceeds(w->launch_time, now,
     347             :                                            wal_receiver_timeout))
     348             :             {
     349           0 :                 elog(WARNING,
     350             :                      "logical replication worker for subscription %u took too long to start; canceled",
     351             :                      w->subid);
     352             : 
     353           0 :                 logicalrep_worker_cleanup(w);
     354           0 :                 did_cleanup = true;
     355             :             }
     356             :         }
     357             : 
     358           6 :         if (did_cleanup)
     359           0 :             goto retry;
     360             :     }
     361             : 
     362             :     /*
     363             :      * If we reached the sync worker limit per subscription, just exit
     364             :      * silently as we might get here because of an otherwise harmless race
     365             :      * condition.
     366             :      */
     367         146 :     if (nsyncworkers >= max_sync_workers_per_subscription)
     368             :     {
     369           0 :         LWLockRelease(LogicalRepWorkerLock);
     370           6 :         return;
     371             :     }
     372             : 
     373             :     /*
     374             :      * However if there are no more free worker slots, inform user about it
     375             :      * before exiting.
     376             :      */
     377         146 :     if (worker == NULL)
     378             :     {
     379           6 :         LWLockRelease(LogicalRepWorkerLock);
     380           6 :         ereport(WARNING,
     381             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     382             :                  errmsg("out of logical replication worker slots"),
     383             :                  errhint("You might need to increase max_logical_replication_workers.")));
     384           6 :         return;
     385             :     }
     386             : 
     387             :     /* Prepare the worker slot. */
     388         140 :     worker->launch_time = now;
     389         140 :     worker->in_use = true;
     390         140 :     worker->generation++;
     391         140 :     worker->proc = NULL;
     392         140 :     worker->dbid = dbid;
     393         140 :     worker->userid = userid;
     394         140 :     worker->subid = subid;
     395         140 :     worker->relid = relid;
     396         140 :     worker->relstate = SUBREL_STATE_UNKNOWN;
     397         140 :     worker->relstate_lsn = InvalidXLogRecPtr;
     398         140 :     worker->last_lsn = InvalidXLogRecPtr;
     399         140 :     TIMESTAMP_NOBEGIN(worker->last_send_time);
     400         140 :     TIMESTAMP_NOBEGIN(worker->last_recv_time);
     401         140 :     worker->reply_lsn = InvalidXLogRecPtr;
     402         140 :     TIMESTAMP_NOBEGIN(worker->reply_time);
     403             : 
     404             :     /* Before releasing lock, remember generation for future identification. */
     405         140 :     generation = worker->generation;
     406             : 
     407         140 :     LWLockRelease(LogicalRepWorkerLock);
     408             : 
     409             :     /* Register the new dynamic worker. */
     410         140 :     memset(&bgw, 0, sizeof(bgw));
     411         140 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
     412             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     413         140 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
     414         140 :     snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
     415         140 :     snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
     416         140 :     if (OidIsValid(relid))
     417         102 :         snprintf(bgw.bgw_name, BGW_MAXLEN,
     418             :                  "logical replication worker for subscription %u sync %u", subid, relid);
     419             :     else
     420          38 :         snprintf(bgw.bgw_name, BGW_MAXLEN,
     421             :                  "logical replication worker for subscription %u", subid);
     422         140 :     snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker");
     423             : 
     424         140 :     bgw.bgw_restart_time = BGW_NEVER_RESTART;
     425         140 :     bgw.bgw_notify_pid = MyProcPid;
     426         140 :     bgw.bgw_main_arg = Int32GetDatum(slot);
     427             : 
     428         140 :     if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
     429             :     {
     430             :         /* Failed to start worker, so clean up the worker slot. */
     431           0 :         LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     432             :         Assert(generation == worker->generation);
     433           0 :         logicalrep_worker_cleanup(worker);
     434           0 :         LWLockRelease(LogicalRepWorkerLock);
     435             : 
     436           0 :         ereport(WARNING,
     437             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     438             :                  errmsg("out of background worker slots"),
     439             :                  errhint("You might need to increase max_worker_processes.")));
     440           0 :         return;
     441             :     }
     442             : 
     443             :     /* Now wait until it attaches. */
     444         140 :     WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
     445             : }
     446             : 
     447             : /*
     448             :  * Stop the logical replication worker for subid/relid, if any, and wait until
     449             :  * it detaches from the slot.
     450             :  */
     451             : void
     452          36 : logicalrep_worker_stop(Oid subid, Oid relid)
     453             : {
     454             :     LogicalRepWorker *worker;
     455             :     uint16      generation;
     456             : 
     457          36 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     458             : 
     459          36 :     worker = logicalrep_worker_find(subid, relid, false);
     460             : 
     461             :     /* No worker, nothing to do. */
     462          36 :     if (!worker)
     463             :     {
     464          20 :         LWLockRelease(LogicalRepWorkerLock);
     465          20 :         return;
     466             :     }
     467             : 
     468             :     /*
     469             :      * Remember which generation was our worker so we can check if what we see
     470             :      * is still the same one.
     471             :      */
     472          16 :     generation = worker->generation;
     473             : 
     474             :     /*
     475             :      * If we found a worker but it does not have proc set then it is still
     476             :      * starting up; wait for it to finish starting and then kill it.
     477             :      */
     478          16 :     while (worker->in_use && !worker->proc)
     479             :     {
     480             :         int         rc;
     481             : 
     482           0 :         LWLockRelease(LogicalRepWorkerLock);
     483             : 
     484             :         /* Wait a bit --- we don't expect to have to wait long. */
     485           0 :         rc = WaitLatch(MyLatch,
     486             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     487             :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
     488             : 
     489           0 :         if (rc & WL_LATCH_SET)
     490             :         {
     491           0 :             ResetLatch(MyLatch);
     492           0 :             CHECK_FOR_INTERRUPTS();
     493             :         }
     494             : 
     495             :         /* Recheck worker status. */
     496           0 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     497             : 
     498             :         /*
     499             :          * Check whether the worker slot is no longer used, which would mean
     500             :          * that the worker has exited, or whether the worker generation is
     501             :          * different, meaning that a different worker has taken the slot.
     502             :          */
     503           0 :         if (!worker->in_use || worker->generation != generation)
     504             :         {
     505           0 :             LWLockRelease(LogicalRepWorkerLock);
     506           0 :             return;
     507             :         }
     508             : 
     509             :         /* Worker has assigned proc, so it has started. */
     510           0 :         if (worker->proc)
     511           0 :             break;
     512             :     }
     513             : 
     514             :     /* Now terminate the worker ... */
     515          16 :     kill(worker->proc->pid, SIGTERM);
     516             : 
     517             :     /* ... and wait for it to die. */
     518             :     for (;;)
     519          16 :     {
     520             :         int         rc;
     521             : 
     522             :         /* is it gone? */
     523          32 :         if (!worker->proc || worker->generation != generation)
     524             :             break;
     525             : 
     526          16 :         LWLockRelease(LogicalRepWorkerLock);
     527             : 
     528             :         /* Wait a bit --- we don't expect to have to wait long. */
     529          16 :         rc = WaitLatch(MyLatch,
     530             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     531             :                        10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
     532             : 
     533          16 :         if (rc & WL_LATCH_SET)
     534             :         {
     535           0 :             ResetLatch(MyLatch);
     536           0 :             CHECK_FOR_INTERRUPTS();
     537             :         }
     538             : 
     539          16 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     540             :     }
     541             : 
     542          16 :     LWLockRelease(LogicalRepWorkerLock);
     543             : }
     544             : 
     545             : /*
     546             :  * Request worker for specified sub/rel to be stopped on commit.
     547             :  */
     548             : void
     549          20 : logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
     550             : {
     551          20 :     int         nestDepth = GetCurrentTransactionNestLevel();
     552             :     LogicalRepWorkerId *wid;
     553             :     MemoryContext oldctx;
     554             : 
     555             :     /* Make sure we store the info in context that survives until commit. */
     556          20 :     oldctx = MemoryContextSwitchTo(TopTransactionContext);
     557             : 
     558             :     /* Check that previous transactions were properly cleaned up. */
     559             :     Assert(on_commit_stop_workers == NULL ||
     560             :            nestDepth >= on_commit_stop_workers->nestDepth);
     561             : 
     562             :     /*
     563             :      * Push a new stack element if we don't already have one for the current
     564             :      * nestDepth.
     565             :      */
     566          20 :     if (on_commit_stop_workers == NULL ||
     567          16 :         nestDepth > on_commit_stop_workers->nestDepth)
     568             :     {
     569           4 :         StopWorkersData *newdata = palloc(sizeof(StopWorkersData));
     570             : 
     571           4 :         newdata->nestDepth = nestDepth;
     572           4 :         newdata->workers = NIL;
     573           4 :         newdata->parent = on_commit_stop_workers;
     574           4 :         on_commit_stop_workers = newdata;
     575             :     }
     576             : 
     577             :     /*
     578             :      * Finally add a new worker into the worker list of the current
     579             :      * subtransaction.
     580             :      */
     581          20 :     wid = palloc(sizeof(LogicalRepWorkerId));
     582          20 :     wid->subid = subid;
     583          20 :     wid->relid = relid;
     584          40 :     on_commit_stop_workers->workers =
     585          20 :         lappend(on_commit_stop_workers->workers, wid);
     586             : 
     587          20 :     MemoryContextSwitchTo(oldctx);
     588          20 : }
     589             : 
     590             : /*
     591             :  * Wake up (using latch) any logical replication worker for specified sub/rel.
     592             :  */
     593             : void
     594          98 : logicalrep_worker_wakeup(Oid subid, Oid relid)
     595             : {
     596             :     LogicalRepWorker *worker;
     597             : 
     598          98 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     599             : 
     600          98 :     worker = logicalrep_worker_find(subid, relid, true);
     601             : 
     602          98 :     if (worker)
     603          98 :         logicalrep_worker_wakeup_ptr(worker);
     604             : 
     605          98 :     LWLockRelease(LogicalRepWorkerLock);
     606          98 : }
     607             : 
     608             : /*
     609             :  * Wake up (using latch) the specified logical replication worker.
     610             :  *
     611             :  * Caller must hold lock, else worker->proc could change under us.
     612             :  */
     613             : void
     614         294 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
     615             : {
     616             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     617             : 
     618         294 :     SetLatch(&worker->proc->procLatch);
     619         294 : }
     620             : 
     621             : /*
     622             :  * Attach to a slot.
     623             :  */
     624             : void
     625         162 : logicalrep_worker_attach(int slot)
     626             : {
     627             :     /* Block concurrent access. */
     628         162 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     629             : 
     630             :     Assert(slot >= 0 && slot < max_logical_replication_workers);
     631         162 :     MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
     632             : 
     633         162 :     if (!MyLogicalRepWorker->in_use)
     634             :     {
     635           0 :         LWLockRelease(LogicalRepWorkerLock);
     636           0 :         ereport(ERROR,
     637             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     638             :                  errmsg("logical replication worker slot %d is empty, cannot attach",
     639             :                         slot)));
     640             :     }
     641             : 
     642         162 :     if (MyLogicalRepWorker->proc)
     643             :     {
     644           0 :         LWLockRelease(LogicalRepWorkerLock);
     645           0 :         ereport(ERROR,
     646             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     647             :                  errmsg("logical replication worker slot %d is already used by "
     648             :                         "another worker, cannot attach", slot)));
     649             :     }
     650             : 
     651         162 :     MyLogicalRepWorker->proc = MyProc;
     652         162 :     before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
     653             : 
     654         162 :     LWLockRelease(LogicalRepWorkerLock);
     655         162 : }
     656             : 
     657             : /*
     658             :  * Detach the worker (cleans up the worker info).
     659             :  */
     660             : static void
     661         162 : logicalrep_worker_detach(void)
     662             : {
     663             :     /* Block concurrent access. */
     664         162 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
     665             : 
     666         162 :     logicalrep_worker_cleanup(MyLogicalRepWorker);
     667             : 
     668         162 :     LWLockRelease(LogicalRepWorkerLock);
     669         162 : }
     670             : 
     671             : /*
     672             :  * Clean up worker info.
     673             :  */
     674             : static void
     675         162 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
     676             : {
     677             :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
     678             : 
     679         162 :     worker->in_use = false;
     680         162 :     worker->proc = NULL;
     681         162 :     worker->dbid = InvalidOid;
     682         162 :     worker->userid = InvalidOid;
     683         162 :     worker->subid = InvalidOid;
     684         162 :     worker->relid = InvalidOid;
     685         162 : }
     686             : 
     687             : /*
     688             :  * Cleanup function for logical replication launcher.
     689             :  *
     690             :  * Called on logical replication launcher exit.
     691             :  */
     692             : static void
     693         366 : logicalrep_launcher_onexit(int code, Datum arg)
     694             : {
     695         366 :     LogicalRepCtx->launcher_pid = 0;
     696         366 : }
     697             : 
     698             : /*
     699             :  * Cleanup function.
     700             :  *
     701             :  * Called on logical replication worker exit.
     702             :  */
     703             : static void
     704         162 : logicalrep_worker_onexit(int code, Datum arg)
     705             : {
     706             :     /* Disconnect gracefully from the remote side. */
     707         162 :     if (wrconn)
     708         152 :         walrcv_disconnect(wrconn);
     709             : 
     710         162 :     logicalrep_worker_detach();
     711             : 
     712         162 :     ApplyLauncherWakeup();
     713         162 : }
     714             : 
     715             : /*
     716             :  * Count the number of registered (not necessarily running) sync workers
     717             :  * for a subscription.
     718             :  */
     719             : int
     720         722 : logicalrep_sync_worker_count(Oid subid)
     721             : {
     722             :     int         i;
     723         722 :     int         res = 0;
     724             : 
     725             :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
     726             : 
     727             :     /* Search for attached worker for a given subscription id. */
     728        3610 :     for (i = 0; i < max_logical_replication_workers; i++)
     729             :     {
     730        2888 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
     731             : 
     732        2888 :         if (w->subid == subid && OidIsValid(w->relid))
     733        1028 :             res++;
     734             :     }
     735             : 
     736         722 :     return res;
     737             : }
     738             : 
     739             : /*
     740             :  * ApplyLauncherShmemSize
     741             :  *      Compute space needed for replication launcher shared memory
     742             :  */
     743             : Size
     744        6514 : ApplyLauncherShmemSize(void)
     745             : {
     746             :     Size        size;
     747             : 
     748             :     /*
     749             :      * Need the fixed struct and the array of LogicalRepWorker.
     750             :      */
     751        6514 :     size = sizeof(LogicalRepCtxStruct);
     752        6514 :     size = MAXALIGN(size);
     753        6514 :     size = add_size(size, mul_size(max_logical_replication_workers,
     754             :                                    sizeof(LogicalRepWorker)));
     755        6514 :     return size;
     756             : }
     757             : 
     758             : /*
     759             :  * ApplyLauncherRegister
     760             :  *      Register a background worker running the logical replication launcher.
     761             :  */
     762             : void
     763         724 : ApplyLauncherRegister(void)
     764             : {
     765             :     BackgroundWorker bgw;
     766             : 
     767         724 :     if (max_logical_replication_workers == 0)
     768           0 :         return;
     769             : 
     770         724 :     memset(&bgw, 0, sizeof(bgw));
     771         724 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
     772             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     773         724 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
     774         724 :     snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
     775         724 :     snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
     776         724 :     snprintf(bgw.bgw_name, BGW_MAXLEN,
     777             :              "logical replication launcher");
     778         724 :     snprintf(bgw.bgw_type, BGW_MAXLEN,
     779             :              "logical replication launcher");
     780         724 :     bgw.bgw_restart_time = 5;
     781         724 :     bgw.bgw_notify_pid = 0;
     782         724 :     bgw.bgw_main_arg = (Datum) 0;
     783             : 
     784         724 :     RegisterBackgroundWorker(&bgw);
     785             : }
     786             : 
     787             : /*
     788             :  * ApplyLauncherShmemInit
     789             :  *      Allocate and initialize replication launcher shared memory
     790             :  */
     791             : void
     792        2170 : ApplyLauncherShmemInit(void)
     793             : {
     794             :     bool        found;
     795             : 
     796        2170 :     LogicalRepCtx = (LogicalRepCtxStruct *)
     797        2170 :         ShmemInitStruct("Logical Replication Launcher Data",
     798             :                         ApplyLauncherShmemSize(),
     799             :                         &found);
     800             : 
     801        2170 :     if (!found)
     802             :     {
     803             :         int         slot;
     804             : 
     805        2170 :         memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
     806             : 
     807             :         /* Initialize memory and spin locks for each worker slot. */
     808       10850 :         for (slot = 0; slot < max_logical_replication_workers; slot++)
     809             :         {
     810        8680 :             LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
     811             : 
     812        8680 :             memset(worker, 0, sizeof(LogicalRepWorker));
     813        8680 :             SpinLockInit(&worker->relmutex);
     814             :         }
     815             :     }
     816        2170 : }
     817             : 
     818             : /*
     819             :  * Check whether current transaction has manipulated logical replication
     820             :  * workers.
     821             :  */
     822             : bool
     823          76 : XactManipulatesLogicalReplicationWorkers(void)
     824             : {
     825          76 :     return (on_commit_stop_workers != NULL);
     826             : }
     827             : 
     828             : /*
     829             :  * Wakeup the launcher on commit if requested.
     830             :  */
     831             : void
     832      495008 : AtEOXact_ApplyLauncher(bool isCommit)
     833             : {
     834             : 
     835             :     Assert(on_commit_stop_workers == NULL ||
     836             :            (on_commit_stop_workers->nestDepth == 1 &&
     837             :             on_commit_stop_workers->parent == NULL));
     838             : 
     839      495008 :     if (isCommit)
     840             :     {
     841             :         ListCell   *lc;
     842             : 
     843      474666 :         if (on_commit_stop_workers != NULL)
     844             :         {
     845           4 :             List       *workers = on_commit_stop_workers->workers;
     846             : 
     847          24 :             foreach(lc, workers)
     848             :             {
     849          20 :                 LogicalRepWorkerId *wid = lfirst(lc);
     850             : 
     851          20 :                 logicalrep_worker_stop(wid->subid, wid->relid);
     852             :             }
     853             :         }
     854             : 
     855      474666 :         if (on_commit_launcher_wakeup)
     856          44 :             ApplyLauncherWakeup();
     857             :     }
     858             : 
     859             :     /*
     860             :      * No need to pfree on_commit_stop_workers.  It was allocated in
     861             :      * transaction memory context, which is going to be cleaned soon.
     862             :      */
     863      495008 :     on_commit_stop_workers = NULL;
     864      495008 :     on_commit_launcher_wakeup = false;
     865      495008 : }
     866             : 
     867             : /*
     868             :  * On commit, merge the current on_commit_stop_workers list into the
     869             :  * immediate parent, if present.
     870             :  * On rollback, discard the current on_commit_stop_workers list.
     871             :  * Pop out the stack.
     872             :  */
     873             : void
     874        7646 : AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
     875             : {
     876             :     StopWorkersData *parent;
     877             : 
     878             :     /* Exit immediately if there's no work to do at this level. */
     879        7646 :     if (on_commit_stop_workers == NULL ||
     880           0 :         on_commit_stop_workers->nestDepth < nestDepth)
     881        7646 :         return;
     882             : 
     883             :     Assert(on_commit_stop_workers->nestDepth == nestDepth);
     884             : 
     885           0 :     parent = on_commit_stop_workers->parent;
     886             : 
     887           0 :     if (isCommit)
     888             :     {
     889             :         /*
     890             :          * If the upper stack element is not an immediate parent
     891             :          * subtransaction, just decrement the notional nesting depth without
     892             :          * doing any real work.  Else, we need to merge the current workers
     893             :          * list into the parent.
     894             :          */
     895           0 :         if (!parent || parent->nestDepth < nestDepth - 1)
     896             :         {
     897           0 :             on_commit_stop_workers->nestDepth--;
     898           0 :             return;
     899             :         }
     900             : 
     901           0 :         parent->workers =
     902           0 :             list_concat(parent->workers, on_commit_stop_workers->workers);
     903             :     }
     904             :     else
     905             :     {
     906             :         /*
     907             :          * Abandon everything that was done at this nesting level.  Explicitly
     908             :          * free memory to avoid a transaction-lifespan leak.
     909             :          */
     910           0 :         list_free_deep(on_commit_stop_workers->workers);
     911             :     }
     912             : 
     913             :     /*
     914             :      * We have taken care of the current subtransaction workers list for both
     915             :      * abort or commit. So we are ready to pop the stack.
     916             :      */
     917           0 :     pfree(on_commit_stop_workers);
     918           0 :     on_commit_stop_workers = parent;
     919             : }
     920             : 
     921             : /*
     922             :  * Request wakeup of the launcher on commit of the transaction.
     923             :  *
     924             :  * This is used to send launcher signal to stop sleeping and process the
     925             :  * subscriptions when current transaction commits. Should be used when new
     926             :  * tuple was added to the pg_subscription catalog.
     927             : */
     928             : void
     929          44 : ApplyLauncherWakeupAtCommit(void)
     930             : {
     931          44 :     if (!on_commit_launcher_wakeup)
     932          44 :         on_commit_launcher_wakeup = true;
     933          44 : }
     934             : 
     935             : static void
     936         206 : ApplyLauncherWakeup(void)
     937             : {
     938         206 :     if (LogicalRepCtx->launcher_pid != 0)
     939         194 :         kill(LogicalRepCtx->launcher_pid, SIGUSR1);
     940         206 : }
     941             : 
     942             : /*
     943             :  * Main loop for the apply launcher process.
     944             :  */
     945             : void
     946         366 : ApplyLauncherMain(Datum main_arg)
     947             : {
     948         366 :     TimestampTz last_start_time = 0;
     949             : 
     950         366 :     ereport(DEBUG1,
     951             :             (errmsg("logical replication launcher started")));
     952             : 
     953         366 :     before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
     954             : 
     955             :     Assert(LogicalRepCtx->launcher_pid == 0);
     956         366 :     LogicalRepCtx->launcher_pid = MyProcPid;
     957             : 
     958             :     /* Establish signal handlers. */
     959         366 :     pqsignal(SIGTERM, SignalHandlerForConfigReload);
     960         366 :     pqsignal(SIGTERM, die);
     961         366 :     BackgroundWorkerUnblockSignals();
     962             : 
     963             :     /*
     964             :      * Establish connection to nailed catalogs (we only ever access
     965             :      * pg_subscription).
     966             :      */
     967         366 :     BackgroundWorkerInitializeConnection(NULL, NULL, 0);
     968             : 
     969             :     /* Enter main loop */
     970             :     for (;;)
     971        1740 :     {
     972             :         int         rc;
     973             :         List       *sublist;
     974             :         ListCell   *lc;
     975             :         MemoryContext subctx;
     976             :         MemoryContext oldctx;
     977             :         TimestampTz now;
     978        2106 :         long        wait_time = DEFAULT_NAPTIME_PER_CYCLE;
     979             : 
     980        2106 :         CHECK_FOR_INTERRUPTS();
     981             : 
     982        2106 :         now = GetCurrentTimestamp();
     983             : 
     984             :         /* Limit the start retry to once a wal_retrieve_retry_interval */
     985        2106 :         if (TimestampDifferenceExceeds(last_start_time, now,
     986             :                                        wal_retrieve_retry_interval))
     987             :         {
     988             :             /* Use temporary context for the database list and worker info. */
     989        2060 :             subctx = AllocSetContextCreate(TopMemoryContext,
     990             :                                            "Logical Replication Launcher sublist",
     991             :                                            ALLOCSET_DEFAULT_SIZES);
     992        2060 :             oldctx = MemoryContextSwitchTo(subctx);
     993             : 
     994             :             /* search for subscriptions to start or stop. */
     995        2060 :             sublist = get_subscription_list();
     996             : 
     997             :             /* Start the missing workers for enabled subscriptions. */
     998        2174 :             foreach(lc, sublist)
     999             :             {
    1000         114 :                 Subscription *sub = (Subscription *) lfirst(lc);
    1001             :                 LogicalRepWorker *w;
    1002             : 
    1003         114 :                 if (!sub->enabled)
    1004           6 :                     continue;
    1005             : 
    1006         108 :                 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1007         108 :                 w = logicalrep_worker_find(sub->oid, InvalidOid, false);
    1008         108 :                 LWLockRelease(LogicalRepWorkerLock);
    1009             : 
    1010         108 :                 if (w == NULL)
    1011             :                 {
    1012          38 :                     last_start_time = now;
    1013          38 :                     wait_time = wal_retrieve_retry_interval;
    1014             : 
    1015          38 :                     logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
    1016             :                                              sub->owner, InvalidOid);
    1017             :                 }
    1018             :             }
    1019             : 
    1020             :             /* Switch back to original memory context. */
    1021        2060 :             MemoryContextSwitchTo(oldctx);
    1022             :             /* Clean the temporary memory. */
    1023        2060 :             MemoryContextDelete(subctx);
    1024             :         }
    1025             :         else
    1026             :         {
    1027             :             /*
    1028             :              * The wait in previous cycle was interrupted in less than
    1029             :              * wal_retrieve_retry_interval since last worker was started, this
    1030             :              * usually means crash of the worker, so we should retry in
    1031             :              * wal_retrieve_retry_interval again.
    1032             :              */
    1033          46 :             wait_time = wal_retrieve_retry_interval;
    1034             :         }
    1035             : 
    1036             :         /* Wait for more work. */
    1037        2106 :         rc = WaitLatch(MyLatch,
    1038             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1039             :                        wait_time,
    1040             :                        WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
    1041             : 
    1042        2100 :         if (rc & WL_LATCH_SET)
    1043             :         {
    1044        2080 :             ResetLatch(MyLatch);
    1045        2080 :             CHECK_FOR_INTERRUPTS();
    1046             :         }
    1047             : 
    1048        1740 :         if (ConfigReloadPending)
    1049             :         {
    1050           0 :             ConfigReloadPending = false;
    1051           0 :             ProcessConfigFile(PGC_SIGHUP);
    1052             :         }
    1053             :     }
    1054             : 
    1055             :     /* Not reachable */
    1056             : }
    1057             : 
    1058             : /*
    1059             :  * Is current process the logical replication launcher?
    1060             :  */
    1061             : bool
    1062         376 : IsLogicalLauncher(void)
    1063             : {
    1064         376 :     return LogicalRepCtx->launcher_pid == MyProcPid;
    1065             : }
    1066             : 
    1067             : /*
    1068             :  * Returns state of the subscriptions.
    1069             :  */
    1070             : Datum
    1071           2 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
    1072             : {
    1073             : #define PG_STAT_GET_SUBSCRIPTION_COLS   8
    1074           2 :     Oid         subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
    1075             :     int         i;
    1076           2 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1077             :     TupleDesc   tupdesc;
    1078             :     Tuplestorestate *tupstore;
    1079             :     MemoryContext per_query_ctx;
    1080             :     MemoryContext oldcontext;
    1081             : 
    1082             :     /* check to see if caller supports us returning a tuplestore */
    1083           2 :     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
    1084           0 :         ereport(ERROR,
    1085             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1086             :                  errmsg("set-valued function called in context that cannot accept a set")));
    1087           2 :     if (!(rsinfo->allowedModes & SFRM_Materialize))
    1088           0 :         ereport(ERROR,
    1089             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1090             :                  errmsg("materialize mode required, but it is not allowed in this context")));
    1091             : 
    1092             :     /* Build a tuple descriptor for our result type */
    1093           2 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
    1094           0 :         elog(ERROR, "return type must be a row type");
    1095             : 
    1096           2 :     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
    1097           2 :     oldcontext = MemoryContextSwitchTo(per_query_ctx);
    1098             : 
    1099           2 :     tupstore = tuplestore_begin_heap(true, false, work_mem);
    1100           2 :     rsinfo->returnMode = SFRM_Materialize;
    1101           2 :     rsinfo->setResult = tupstore;
    1102           2 :     rsinfo->setDesc = tupdesc;
    1103             : 
    1104           2 :     MemoryContextSwitchTo(oldcontext);
    1105             : 
    1106             :     /* Make sure we get consistent view of the workers. */
    1107           2 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
    1108             : 
    1109          12 :     for (i = 0; i <= max_logical_replication_workers; i++)
    1110             :     {
    1111             :         /* for each row */
    1112             :         Datum       values[PG_STAT_GET_SUBSCRIPTION_COLS];
    1113             :         bool        nulls[PG_STAT_GET_SUBSCRIPTION_COLS];
    1114             :         int         worker_pid;
    1115             :         LogicalRepWorker worker;
    1116             : 
    1117          10 :         memcpy(&worker, &LogicalRepCtx->workers[i],
    1118             :                sizeof(LogicalRepWorker));
    1119          10 :         if (!worker.proc || !IsBackendPid(worker.proc->pid))
    1120           6 :             continue;
    1121             : 
    1122           4 :         if (OidIsValid(subid) && worker.subid != subid)
    1123           0 :             continue;
    1124             : 
    1125           4 :         worker_pid = worker.proc->pid;
    1126             : 
    1127          36 :         MemSet(values, 0, sizeof(values));
    1128           8 :         MemSet(nulls, 0, sizeof(nulls));
    1129             : 
    1130           4 :         values[0] = ObjectIdGetDatum(worker.subid);
    1131           4 :         if (OidIsValid(worker.relid))
    1132           0 :             values[1] = ObjectIdGetDatum(worker.relid);
    1133             :         else
    1134           4 :             nulls[1] = true;
    1135           4 :         values[2] = Int32GetDatum(worker_pid);
    1136           4 :         if (XLogRecPtrIsInvalid(worker.last_lsn))
    1137           2 :             nulls[3] = true;
    1138             :         else
    1139           2 :             values[3] = LSNGetDatum(worker.last_lsn);
    1140           4 :         if (worker.last_send_time == 0)
    1141           0 :             nulls[4] = true;
    1142             :         else
    1143           4 :             values[4] = TimestampTzGetDatum(worker.last_send_time);
    1144           4 :         if (worker.last_recv_time == 0)
    1145           0 :             nulls[5] = true;
    1146             :         else
    1147           4 :             values[5] = TimestampTzGetDatum(worker.last_recv_time);
    1148           4 :         if (XLogRecPtrIsInvalid(worker.reply_lsn))
    1149           2 :             nulls[6] = true;
    1150             :         else
    1151           2 :             values[6] = LSNGetDatum(worker.reply_lsn);
    1152           4 :         if (worker.reply_time == 0)
    1153           0 :             nulls[7] = true;
    1154             :         else
    1155           4 :             values[7] = TimestampTzGetDatum(worker.reply_time);
    1156             : 
    1157           4 :         tuplestore_putvalues(tupstore, tupdesc, values, nulls);
    1158             : 
    1159             :         /*
    1160             :          * If only a single subscription was requested, and we found it,
    1161             :          * break.
    1162             :          */
    1163           4 :         if (OidIsValid(subid))
    1164           0 :             break;
    1165             :     }
    1166             : 
    1167           2 :     LWLockRelease(LogicalRepWorkerLock);
    1168             : 
    1169             :     /* clean up and return the tuplestore */
    1170             :     tuplestore_donestoring(tupstore);
    1171             : 
    1172           2 :     return (Datum) 0;
    1173             : }

Generated by: LCOV version 1.13