LCOV - code coverage report
Current view: top level - src/backend/catalog - pg_subscription.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 183 192 95.3 %
Date: 2025-01-18 03:14:54 Functions: 12 12 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_subscription.c
       4             :  *      replication subscriptions
       5             :  *
       6             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *      src/backend/catalog/pg_subscription.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/genam.h"
      18             : #include "access/heapam.h"
      19             : #include "access/htup_details.h"
      20             : #include "access/tableam.h"
      21             : #include "catalog/indexing.h"
      22             : #include "catalog/pg_subscription.h"
      23             : #include "catalog/pg_subscription_rel.h"
      24             : #include "catalog/pg_type.h"
      25             : #include "miscadmin.h"
      26             : #include "storage/lmgr.h"
      27             : #include "utils/array.h"
      28             : #include "utils/builtins.h"
      29             : #include "utils/fmgroids.h"
      30             : #include "utils/lsyscache.h"
      31             : #include "utils/pg_lsn.h"
      32             : #include "utils/rel.h"
      33             : #include "utils/syscache.h"
      34             : 
      35             : static List *textarray_to_stringlist(ArrayType *textarray);
      36             : 
      37             : /*
      38             :  * Add a comma-separated list of publication names to the 'dest' string.
      39             :  */
      40             : void
      41         894 : GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
      42             : {
      43             :     ListCell   *lc;
      44         894 :     bool        first = true;
      45             : 
      46             :     Assert(publications != NIL);
      47             : 
      48        2316 :     foreach(lc, publications)
      49             :     {
      50        1422 :         char       *pubname = strVal(lfirst(lc));
      51             : 
      52        1422 :         if (first)
      53         894 :             first = false;
      54             :         else
      55         528 :             appendStringInfoString(dest, ", ");
      56             : 
      57        1422 :         if (quote_literal)
      58        1410 :             appendStringInfoString(dest, quote_literal_cstr(pubname));
      59             :         else
      60             :         {
      61          12 :             appendStringInfoChar(dest, '"');
      62          12 :             appendStringInfoString(dest, pubname);
      63          12 :             appendStringInfoChar(dest, '"');
      64             :         }
      65             :     }
      66         894 : }
      67             : 
      68             : /*
      69             :  * Fetch the subscription from the syscache.
      70             :  */
      71             : Subscription *
      72        1374 : GetSubscription(Oid subid, bool missing_ok)
      73             : {
      74             :     HeapTuple   tup;
      75             :     Subscription *sub;
      76             :     Form_pg_subscription subform;
      77             :     Datum       datum;
      78             :     bool        isnull;
      79             : 
      80        1374 :     tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
      81             : 
      82        1372 :     if (!HeapTupleIsValid(tup))
      83             :     {
      84           0 :         if (missing_ok)
      85           0 :             return NULL;
      86             : 
      87           0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
      88             :     }
      89             : 
      90        1372 :     subform = (Form_pg_subscription) GETSTRUCT(tup);
      91             : 
      92        1372 :     sub = (Subscription *) palloc(sizeof(Subscription));
      93        1372 :     sub->oid = subid;
      94        1372 :     sub->dbid = subform->subdbid;
      95        1372 :     sub->skiplsn = subform->subskiplsn;
      96        1372 :     sub->name = pstrdup(NameStr(subform->subname));
      97        1372 :     sub->owner = subform->subowner;
      98        1372 :     sub->enabled = subform->subenabled;
      99        1372 :     sub->binary = subform->subbinary;
     100        1372 :     sub->stream = subform->substream;
     101        1372 :     sub->twophasestate = subform->subtwophasestate;
     102        1372 :     sub->disableonerr = subform->subdisableonerr;
     103        1372 :     sub->passwordrequired = subform->subpasswordrequired;
     104        1372 :     sub->runasowner = subform->subrunasowner;
     105        1372 :     sub->failover = subform->subfailover;
     106             : 
     107             :     /* Get conninfo */
     108        1372 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     109             :                                    tup,
     110             :                                    Anum_pg_subscription_subconninfo);
     111        1372 :     sub->conninfo = TextDatumGetCString(datum);
     112             : 
     113             :     /* Get slotname */
     114        1372 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID,
     115             :                             tup,
     116             :                             Anum_pg_subscription_subslotname,
     117             :                             &isnull);
     118        1372 :     if (!isnull)
     119        1306 :         sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
     120             :     else
     121          66 :         sub->slotname = NULL;
     122             : 
     123             :     /* Get synccommit */
     124        1372 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     125             :                                    tup,
     126             :                                    Anum_pg_subscription_subsynccommit);
     127        1372 :     sub->synccommit = TextDatumGetCString(datum);
     128             : 
     129             :     /* Get publications */
     130        1372 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     131             :                                    tup,
     132             :                                    Anum_pg_subscription_subpublications);
     133        1372 :     sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
     134             : 
     135             :     /* Get origin */
     136        1372 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     137             :                                    tup,
     138             :                                    Anum_pg_subscription_suborigin);
     139        1372 :     sub->origin = TextDatumGetCString(datum);
     140             : 
     141             :     /* Is the subscription owner a superuser? */
     142        1372 :     sub->ownersuperuser = superuser_arg(sub->owner);
     143             : 
     144        1372 :     ReleaseSysCache(tup);
     145             : 
     146        1372 :     return sub;
     147             : }
     148             : 
     149             : /*
     150             :  * Return number of subscriptions defined in given database.
     151             :  * Used by dropdb() to check if database can indeed be dropped.
     152             :  */
     153             : int
     154          68 : CountDBSubscriptions(Oid dbid)
     155             : {
     156          68 :     int         nsubs = 0;
     157             :     Relation    rel;
     158             :     ScanKeyData scankey;
     159             :     SysScanDesc scan;
     160             :     HeapTuple   tup;
     161             : 
     162          68 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     163             : 
     164          68 :     ScanKeyInit(&scankey,
     165             :                 Anum_pg_subscription_subdbid,
     166             :                 BTEqualStrategyNumber, F_OIDEQ,
     167             :                 ObjectIdGetDatum(dbid));
     168             : 
     169          68 :     scan = systable_beginscan(rel, InvalidOid, false,
     170             :                               NULL, 1, &scankey);
     171             : 
     172          68 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     173           0 :         nsubs++;
     174             : 
     175          68 :     systable_endscan(scan);
     176             : 
     177          68 :     table_close(rel, NoLock);
     178             : 
     179          68 :     return nsubs;
     180             : }
     181             : 
     182             : /*
     183             :  * Free memory allocated by subscription struct.
     184             :  */
     185             : void
     186          66 : FreeSubscription(Subscription *sub)
     187             : {
     188          66 :     pfree(sub->name);
     189          66 :     pfree(sub->conninfo);
     190          66 :     if (sub->slotname)
     191          66 :         pfree(sub->slotname);
     192          66 :     list_free_deep(sub->publications);
     193          66 :     pfree(sub);
     194          66 : }
     195             : 
     196             : /*
     197             :  * Disable the given subscription.
     198             :  */
     199             : void
     200           8 : DisableSubscription(Oid subid)
     201             : {
     202             :     Relation    rel;
     203             :     bool        nulls[Natts_pg_subscription];
     204             :     bool        replaces[Natts_pg_subscription];
     205             :     Datum       values[Natts_pg_subscription];
     206             :     HeapTuple   tup;
     207             : 
     208             :     /* Look up the subscription in the catalog */
     209           8 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     210           8 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
     211             : 
     212           8 :     if (!HeapTupleIsValid(tup))
     213           0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
     214             : 
     215           8 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     216             : 
     217             :     /* Form a new tuple. */
     218           8 :     memset(values, 0, sizeof(values));
     219           8 :     memset(nulls, false, sizeof(nulls));
     220           8 :     memset(replaces, false, sizeof(replaces));
     221             : 
     222             :     /* Set the subscription to disabled. */
     223           8 :     values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
     224           8 :     replaces[Anum_pg_subscription_subenabled - 1] = true;
     225             : 
     226             :     /* Update the catalog */
     227           8 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     228             :                             replaces);
     229           8 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     230           8 :     heap_freetuple(tup);
     231             : 
     232           8 :     table_close(rel, NoLock);
     233           8 : }
     234             : 
     235             : /*
     236             :  * Convert text array to list of strings.
     237             :  *
     238             :  * Note: the resulting list of strings is pallocated here.
     239             :  */
     240             : static List *
     241        1372 : textarray_to_stringlist(ArrayType *textarray)
     242             : {
     243             :     Datum      *elems;
     244             :     int         nelems,
     245             :                 i;
     246        1372 :     List       *res = NIL;
     247             : 
     248        1372 :     deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
     249             : 
     250        1372 :     if (nelems == 0)
     251           0 :         return NIL;
     252             : 
     253        3404 :     for (i = 0; i < nelems; i++)
     254        2032 :         res = lappend(res, makeString(TextDatumGetCString(elems[i])));
     255             : 
     256        1372 :     return res;
     257             : }
     258             : 
     259             : /*
     260             :  * Add new state record for a subscription table.
     261             :  *
     262             :  * If retain_lock is true, then don't release the locks taken in this function.
     263             :  * We normally release the locks at the end of transaction but in binary-upgrade
     264             :  * mode, we expect to release those immediately.
     265             :  */
     266             : void
     267         380 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
     268             :                         XLogRecPtr sublsn, bool retain_lock)
     269             : {
     270             :     Relation    rel;
     271             :     HeapTuple   tup;
     272             :     bool        nulls[Natts_pg_subscription_rel];
     273             :     Datum       values[Natts_pg_subscription_rel];
     274             : 
     275         380 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     276             : 
     277         380 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     278             : 
     279             :     /* Try finding existing mapping. */
     280         380 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     281             :                               ObjectIdGetDatum(relid),
     282             :                               ObjectIdGetDatum(subid));
     283         380 :     if (HeapTupleIsValid(tup))
     284           0 :         elog(ERROR, "subscription table %u in subscription %u already exists",
     285             :              relid, subid);
     286             : 
     287             :     /* Form the tuple. */
     288         380 :     memset(values, 0, sizeof(values));
     289         380 :     memset(nulls, false, sizeof(nulls));
     290         380 :     values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
     291         380 :     values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
     292         380 :     values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     293         380 :     if (sublsn != InvalidXLogRecPtr)
     294           2 :         values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     295             :     else
     296         378 :         nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     297             : 
     298         380 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     299             : 
     300             :     /* Insert tuple into catalog. */
     301         380 :     CatalogTupleInsert(rel, tup);
     302             : 
     303         380 :     heap_freetuple(tup);
     304             : 
     305             :     /* Cleanup. */
     306         380 :     if (retain_lock)
     307             :     {
     308         376 :         table_close(rel, NoLock);
     309             :     }
     310             :     else
     311             :     {
     312           4 :         table_close(rel, RowExclusiveLock);
     313           4 :         UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     314             :     }
     315         380 : }
     316             : 
     317             : /*
     318             :  * Update the state of a subscription table.
     319             :  */
     320             : void
     321        1404 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
     322             :                            XLogRecPtr sublsn)
     323             : {
     324             :     Relation    rel;
     325             :     HeapTuple   tup;
     326             :     bool        nulls[Natts_pg_subscription_rel];
     327             :     Datum       values[Natts_pg_subscription_rel];
     328             :     bool        replaces[Natts_pg_subscription_rel];
     329             : 
     330        1404 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     331             : 
     332        1402 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     333             : 
     334             :     /* Try finding existing mapping. */
     335        1402 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     336             :                               ObjectIdGetDatum(relid),
     337             :                               ObjectIdGetDatum(subid));
     338        1402 :     if (!HeapTupleIsValid(tup))
     339           0 :         elog(ERROR, "subscription table %u in subscription %u does not exist",
     340             :              relid, subid);
     341             : 
     342             :     /* Update the tuple. */
     343        1402 :     memset(values, 0, sizeof(values));
     344        1402 :     memset(nulls, false, sizeof(nulls));
     345        1402 :     memset(replaces, false, sizeof(replaces));
     346             : 
     347        1402 :     replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
     348        1402 :     values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     349             : 
     350        1402 :     replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
     351        1402 :     if (sublsn != InvalidXLogRecPtr)
     352         688 :         values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     353             :     else
     354         714 :         nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     355             : 
     356        1402 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     357             :                             replaces);
     358             : 
     359             :     /* Update the catalog. */
     360        1402 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     361             : 
     362             :     /* Cleanup. */
     363        1402 :     table_close(rel, NoLock);
     364        1402 : }
     365             : 
     366             : /*
     367             :  * Get state of subscription table.
     368             :  *
     369             :  * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
     370             :  */
     371             : char
     372        2180 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
     373             : {
     374             :     HeapTuple   tup;
     375             :     char        substate;
     376             :     bool        isnull;
     377             :     Datum       d;
     378             :     Relation    rel;
     379             : 
     380             :     /*
     381             :      * This is to avoid the race condition with AlterSubscription which tries
     382             :      * to remove this relstate.
     383             :      */
     384        2180 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     385             : 
     386             :     /* Try finding the mapping. */
     387        2180 :     tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
     388             :                           ObjectIdGetDatum(relid),
     389             :                           ObjectIdGetDatum(subid));
     390             : 
     391        2180 :     if (!HeapTupleIsValid(tup))
     392             :     {
     393          54 :         table_close(rel, AccessShareLock);
     394          54 :         *sublsn = InvalidXLogRecPtr;
     395          54 :         return SUBREL_STATE_UNKNOWN;
     396             :     }
     397             : 
     398             :     /* Get the state. */
     399        2126 :     substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
     400             : 
     401             :     /* Get the LSN */
     402        2126 :     d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     403             :                         Anum_pg_subscription_rel_srsublsn, &isnull);
     404        2126 :     if (isnull)
     405        1154 :         *sublsn = InvalidXLogRecPtr;
     406             :     else
     407         972 :         *sublsn = DatumGetLSN(d);
     408             : 
     409             :     /* Cleanup */
     410        2126 :     ReleaseSysCache(tup);
     411             : 
     412        2126 :     table_close(rel, AccessShareLock);
     413             : 
     414        2126 :     return substate;
     415             : }
     416             : 
     417             : /*
     418             :  * Drop subscription relation mapping. These can be for a particular
     419             :  * subscription, or for a particular relation, or both.
     420             :  */
     421             : void
     422       45678 : RemoveSubscriptionRel(Oid subid, Oid relid)
     423             : {
     424             :     Relation    rel;
     425             :     TableScanDesc scan;
     426             :     ScanKeyData skey[2];
     427             :     HeapTuple   tup;
     428       45678 :     int         nkeys = 0;
     429             : 
     430       45678 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     431             : 
     432       45678 :     if (OidIsValid(subid))
     433             :     {
     434         232 :         ScanKeyInit(&skey[nkeys++],
     435             :                     Anum_pg_subscription_rel_srsubid,
     436             :                     BTEqualStrategyNumber,
     437             :                     F_OIDEQ,
     438             :                     ObjectIdGetDatum(subid));
     439             :     }
     440             : 
     441       45678 :     if (OidIsValid(relid))
     442             :     {
     443       45482 :         ScanKeyInit(&skey[nkeys++],
     444             :                     Anum_pg_subscription_rel_srrelid,
     445             :                     BTEqualStrategyNumber,
     446             :                     F_OIDEQ,
     447             :                     ObjectIdGetDatum(relid));
     448             :     }
     449             : 
     450             :     /* Do the search and delete what we found. */
     451       45678 :     scan = table_beginscan_catalog(rel, nkeys, skey);
     452       45884 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     453             :     {
     454             :         Form_pg_subscription_rel subrel;
     455             : 
     456         206 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     457             : 
     458             :         /*
     459             :          * We don't allow to drop the relation mapping when the table
     460             :          * synchronization is in progress unless the caller updates the
     461             :          * corresponding subscription as well. This is to ensure that we don't
     462             :          * leave tablesync slots or origins in the system when the
     463             :          * corresponding table is dropped.
     464             :          */
     465         206 :         if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY)
     466             :         {
     467           0 :             ereport(ERROR,
     468             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     469             :                      errmsg("could not drop relation mapping for subscription \"%s\"",
     470             :                             get_subscription_name(subrel->srsubid, false)),
     471             :                      errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
     472             :                                get_rel_name(relid), subrel->srsubstate),
     473             : 
     474             :             /*
     475             :              * translator: first %s is a SQL ALTER command and second %s is a
     476             :              * SQL DROP command
     477             :              */
     478             :                      errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
     479             :                              "ALTER SUBSCRIPTION ... ENABLE",
     480             :                              "DROP SUBSCRIPTION ...")));
     481             :         }
     482             : 
     483         206 :         CatalogTupleDelete(rel, &tup->t_self);
     484             :     }
     485       45678 :     table_endscan(scan);
     486             : 
     487       45678 :     table_close(rel, RowExclusiveLock);
     488       45678 : }
     489             : 
     490             : /*
     491             :  * Does the subscription have any relations?
     492             :  *
     493             :  * Use this function only to know true/false, and when you have no need for the
     494             :  * List returned by GetSubscriptionRelations.
     495             :  */
     496             : bool
     497         420 : HasSubscriptionRelations(Oid subid)
     498             : {
     499             :     Relation    rel;
     500             :     ScanKeyData skey[1];
     501             :     SysScanDesc scan;
     502             :     bool        has_subrels;
     503             : 
     504         420 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     505             : 
     506         420 :     ScanKeyInit(&skey[0],
     507             :                 Anum_pg_subscription_rel_srsubid,
     508             :                 BTEqualStrategyNumber, F_OIDEQ,
     509             :                 ObjectIdGetDatum(subid));
     510             : 
     511         420 :     scan = systable_beginscan(rel, InvalidOid, false,
     512             :                               NULL, 1, skey);
     513             : 
     514             :     /* If even a single tuple exists then the subscription has tables. */
     515         420 :     has_subrels = HeapTupleIsValid(systable_getnext(scan));
     516             : 
     517             :     /* Cleanup */
     518         420 :     systable_endscan(scan);
     519         420 :     table_close(rel, AccessShareLock);
     520             : 
     521         420 :     return has_subrels;
     522             : }
     523             : 
     524             : /*
     525             :  * Get the relations for the subscription.
     526             :  *
     527             :  * If not_ready is true, return only the relations that are not in a ready
     528             :  * state, otherwise return all the relations of the subscription.  The
     529             :  * returned list is palloc'ed in the current memory context.
     530             :  */
     531             : List *
     532        1840 : GetSubscriptionRelations(Oid subid, bool not_ready)
     533             : {
     534        1840 :     List       *res = NIL;
     535             :     Relation    rel;
     536             :     HeapTuple   tup;
     537        1840 :     int         nkeys = 0;
     538             :     ScanKeyData skey[2];
     539             :     SysScanDesc scan;
     540             : 
     541        1840 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     542             : 
     543        1840 :     ScanKeyInit(&skey[nkeys++],
     544             :                 Anum_pg_subscription_rel_srsubid,
     545             :                 BTEqualStrategyNumber, F_OIDEQ,
     546             :                 ObjectIdGetDatum(subid));
     547             : 
     548        1840 :     if (not_ready)
     549        1784 :         ScanKeyInit(&skey[nkeys++],
     550             :                     Anum_pg_subscription_rel_srsubstate,
     551             :                     BTEqualStrategyNumber, F_CHARNE,
     552             :                     CharGetDatum(SUBREL_STATE_READY));
     553             : 
     554        1840 :     scan = systable_beginscan(rel, InvalidOid, false,
     555             :                               NULL, nkeys, skey);
     556             : 
     557        4560 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     558             :     {
     559             :         Form_pg_subscription_rel subrel;
     560             :         SubscriptionRelState *relstate;
     561             :         Datum       d;
     562             :         bool        isnull;
     563             : 
     564        2720 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     565             : 
     566        2720 :         relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
     567        2720 :         relstate->relid = subrel->srrelid;
     568        2720 :         relstate->state = subrel->srsubstate;
     569        2720 :         d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     570             :                             Anum_pg_subscription_rel_srsublsn, &isnull);
     571        2720 :         if (isnull)
     572        2214 :             relstate->lsn = InvalidXLogRecPtr;
     573             :         else
     574         506 :             relstate->lsn = DatumGetLSN(d);
     575             : 
     576        2720 :         res = lappend(res, relstate);
     577             :     }
     578             : 
     579             :     /* Cleanup */
     580        1840 :     systable_endscan(scan);
     581        1840 :     table_close(rel, AccessShareLock);
     582             : 
     583        1840 :     return res;
     584             : }

Generated by: LCOV version 1.14