LCOV - code coverage report
Current view: top level - src/backend/catalog - pg_subscription.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 186 195 95.4 %
Date: 2025-08-09 08:18:06 Functions: 12 12 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_subscription.c
       4             :  *      replication subscriptions
       5             :  *
       6             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *      src/backend/catalog/pg_subscription.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/genam.h"
      18             : #include "access/heapam.h"
      19             : #include "access/htup_details.h"
      20             : #include "access/tableam.h"
      21             : #include "catalog/indexing.h"
      22             : #include "catalog/pg_subscription.h"
      23             : #include "catalog/pg_subscription_rel.h"
      24             : #include "catalog/pg_type.h"
      25             : #include "miscadmin.h"
      26             : #include "storage/lmgr.h"
      27             : #include "utils/array.h"
      28             : #include "utils/builtins.h"
      29             : #include "utils/fmgroids.h"
      30             : #include "utils/lsyscache.h"
      31             : #include "utils/pg_lsn.h"
      32             : #include "utils/rel.h"
      33             : #include "utils/syscache.h"
      34             : 
      35             : static List *textarray_to_stringlist(ArrayType *textarray);
      36             : 
      37             : /*
      38             :  * Add a comma-separated list of publication names to the 'dest' string.
      39             :  */
      40             : void
      41         984 : GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
      42             : {
      43             :     ListCell   *lc;
      44         984 :     bool        first = true;
      45             : 
      46             :     Assert(publications != NIL);
      47             : 
      48        2554 :     foreach(lc, publications)
      49             :     {
      50        1570 :         char       *pubname = strVal(lfirst(lc));
      51             : 
      52        1570 :         if (first)
      53         984 :             first = false;
      54             :         else
      55         586 :             appendStringInfoString(dest, ", ");
      56             : 
      57        1570 :         if (quote_literal)
      58        1550 :             appendStringInfoString(dest, quote_literal_cstr(pubname));
      59             :         else
      60             :         {
      61          20 :             appendStringInfoChar(dest, '"');
      62          20 :             appendStringInfoString(dest, pubname);
      63          20 :             appendStringInfoChar(dest, '"');
      64             :         }
      65             :     }
      66         984 : }
      67             : 
      68             : /*
      69             :  * Fetch the subscription from the syscache.
      70             :  */
      71             : Subscription *
      72        1542 : GetSubscription(Oid subid, bool missing_ok)
      73             : {
      74             :     HeapTuple   tup;
      75             :     Subscription *sub;
      76             :     Form_pg_subscription subform;
      77             :     Datum       datum;
      78             :     bool        isnull;
      79             : 
      80        1542 :     tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
      81             : 
      82        1540 :     if (!HeapTupleIsValid(tup))
      83             :     {
      84           0 :         if (missing_ok)
      85           0 :             return NULL;
      86             : 
      87           0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
      88             :     }
      89             : 
      90        1540 :     subform = (Form_pg_subscription) GETSTRUCT(tup);
      91             : 
      92        1540 :     sub = (Subscription *) palloc(sizeof(Subscription));
      93        1540 :     sub->oid = subid;
      94        1540 :     sub->dbid = subform->subdbid;
      95        1540 :     sub->skiplsn = subform->subskiplsn;
      96        1540 :     sub->name = pstrdup(NameStr(subform->subname));
      97        1540 :     sub->owner = subform->subowner;
      98        1540 :     sub->enabled = subform->subenabled;
      99        1540 :     sub->binary = subform->subbinary;
     100        1540 :     sub->stream = subform->substream;
     101        1540 :     sub->twophasestate = subform->subtwophasestate;
     102        1540 :     sub->disableonerr = subform->subdisableonerr;
     103        1540 :     sub->passwordrequired = subform->subpasswordrequired;
     104        1540 :     sub->runasowner = subform->subrunasowner;
     105        1540 :     sub->failover = subform->subfailover;
     106        1540 :     sub->retaindeadtuples = subform->subretaindeadtuples;
     107             : 
     108             :     /* Get conninfo */
     109        1540 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     110             :                                    tup,
     111             :                                    Anum_pg_subscription_subconninfo);
     112        1540 :     sub->conninfo = TextDatumGetCString(datum);
     113             : 
     114             :     /* Get slotname */
     115        1540 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID,
     116             :                             tup,
     117             :                             Anum_pg_subscription_subslotname,
     118             :                             &isnull);
     119        1540 :     if (!isnull)
     120        1474 :         sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
     121             :     else
     122          66 :         sub->slotname = NULL;
     123             : 
     124             :     /* Get synccommit */
     125        1540 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     126             :                                    tup,
     127             :                                    Anum_pg_subscription_subsynccommit);
     128        1540 :     sub->synccommit = TextDatumGetCString(datum);
     129             : 
     130             :     /* Get publications */
     131        1540 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     132             :                                    tup,
     133             :                                    Anum_pg_subscription_subpublications);
     134        1540 :     sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
     135             : 
     136             :     /* Get origin */
     137        1540 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     138             :                                    tup,
     139             :                                    Anum_pg_subscription_suborigin);
     140        1540 :     sub->origin = TextDatumGetCString(datum);
     141             : 
     142             :     /* Is the subscription owner a superuser? */
     143        1540 :     sub->ownersuperuser = superuser_arg(sub->owner);
     144             : 
     145        1540 :     ReleaseSysCache(tup);
     146             : 
     147        1540 :     return sub;
     148             : }
     149             : 
     150             : /*
     151             :  * Return number of subscriptions defined in given database.
     152             :  * Used by dropdb() to check if database can indeed be dropped.
     153             :  */
     154             : int
     155          88 : CountDBSubscriptions(Oid dbid)
     156             : {
     157          88 :     int         nsubs = 0;
     158             :     Relation    rel;
     159             :     ScanKeyData scankey;
     160             :     SysScanDesc scan;
     161             :     HeapTuple   tup;
     162             : 
     163          88 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     164             : 
     165          88 :     ScanKeyInit(&scankey,
     166             :                 Anum_pg_subscription_subdbid,
     167             :                 BTEqualStrategyNumber, F_OIDEQ,
     168             :                 ObjectIdGetDatum(dbid));
     169             : 
     170          88 :     scan = systable_beginscan(rel, InvalidOid, false,
     171             :                               NULL, 1, &scankey);
     172             : 
     173          88 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     174           0 :         nsubs++;
     175             : 
     176          88 :     systable_endscan(scan);
     177             : 
     178          88 :     table_close(rel, NoLock);
     179             : 
     180          88 :     return nsubs;
     181             : }
     182             : 
     183             : /*
     184             :  * Free memory allocated by subscription struct.
     185             :  */
     186             : void
     187          76 : FreeSubscription(Subscription *sub)
     188             : {
     189          76 :     pfree(sub->name);
     190          76 :     pfree(sub->conninfo);
     191          76 :     if (sub->slotname)
     192          76 :         pfree(sub->slotname);
     193          76 :     list_free_deep(sub->publications);
     194          76 :     pfree(sub);
     195          76 : }
     196             : 
     197             : /*
     198             :  * Disable the given subscription.
     199             :  */
     200             : void
     201           8 : DisableSubscription(Oid subid)
     202             : {
     203             :     Relation    rel;
     204             :     bool        nulls[Natts_pg_subscription];
     205             :     bool        replaces[Natts_pg_subscription];
     206             :     Datum       values[Natts_pg_subscription];
     207             :     HeapTuple   tup;
     208             : 
     209             :     /* Look up the subscription in the catalog */
     210           8 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     211           8 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
     212             : 
     213           8 :     if (!HeapTupleIsValid(tup))
     214           0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
     215             : 
     216           8 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     217             : 
     218             :     /* Form a new tuple. */
     219           8 :     memset(values, 0, sizeof(values));
     220           8 :     memset(nulls, false, sizeof(nulls));
     221           8 :     memset(replaces, false, sizeof(replaces));
     222             : 
     223             :     /* Set the subscription to disabled. */
     224           8 :     values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
     225           8 :     replaces[Anum_pg_subscription_subenabled - 1] = true;
     226             : 
     227             :     /* Update the catalog */
     228           8 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     229             :                             replaces);
     230           8 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     231           8 :     heap_freetuple(tup);
     232             : 
     233           8 :     table_close(rel, NoLock);
     234           8 : }
     235             : 
     236             : /*
     237             :  * Convert text array to list of strings.
     238             :  *
     239             :  * Note: the resulting list of strings is pallocated here.
     240             :  */
     241             : static List *
     242        1540 : textarray_to_stringlist(ArrayType *textarray)
     243             : {
     244             :     Datum      *elems;
     245             :     int         nelems,
     246             :                 i;
     247        1540 :     List       *res = NIL;
     248             : 
     249        1540 :     deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
     250             : 
     251        1540 :     if (nelems == 0)
     252           0 :         return NIL;
     253             : 
     254        3800 :     for (i = 0; i < nelems; i++)
     255        2260 :         res = lappend(res, makeString(TextDatumGetCString(elems[i])));
     256             : 
     257        1540 :     return res;
     258             : }
     259             : 
     260             : /*
     261             :  * Add new state record for a subscription table.
     262             :  *
     263             :  * If retain_lock is true, then don't release the locks taken in this function.
     264             :  * We normally release the locks at the end of transaction but in binary-upgrade
     265             :  * mode, we expect to release those immediately.
     266             :  */
     267             : void
     268         406 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
     269             :                         XLogRecPtr sublsn, bool retain_lock)
     270             : {
     271             :     Relation    rel;
     272             :     HeapTuple   tup;
     273             :     bool        nulls[Natts_pg_subscription_rel];
     274             :     Datum       values[Natts_pg_subscription_rel];
     275             : 
     276         406 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     277             : 
     278         406 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     279             : 
     280             :     /* Try finding existing mapping. */
     281         406 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     282             :                               ObjectIdGetDatum(relid),
     283             :                               ObjectIdGetDatum(subid));
     284         406 :     if (HeapTupleIsValid(tup))
     285           0 :         elog(ERROR, "subscription table %u in subscription %u already exists",
     286             :              relid, subid);
     287             : 
     288             :     /* Form the tuple. */
     289         406 :     memset(values, 0, sizeof(values));
     290         406 :     memset(nulls, false, sizeof(nulls));
     291         406 :     values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
     292         406 :     values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
     293         406 :     values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     294         406 :     if (sublsn != InvalidXLogRecPtr)
     295           2 :         values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     296             :     else
     297         404 :         nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     298             : 
     299         406 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     300             : 
     301             :     /* Insert tuple into catalog. */
     302         406 :     CatalogTupleInsert(rel, tup);
     303             : 
     304         406 :     heap_freetuple(tup);
     305             : 
     306             :     /* Cleanup. */
     307         406 :     if (retain_lock)
     308             :     {
     309         402 :         table_close(rel, NoLock);
     310             :     }
     311             :     else
     312             :     {
     313           4 :         table_close(rel, RowExclusiveLock);
     314           4 :         UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     315             :     }
     316         406 : }
     317             : 
     318             : /*
     319             :  * Update the state of a subscription table.
     320             :  */
     321             : void
     322        1472 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
     323             :                            XLogRecPtr sublsn, bool already_locked)
     324             : {
     325             :     Relation    rel;
     326             :     HeapTuple   tup;
     327             :     bool        nulls[Natts_pg_subscription_rel];
     328             :     Datum       values[Natts_pg_subscription_rel];
     329             :     bool        replaces[Natts_pg_subscription_rel];
     330             : 
     331        1472 :     if (already_locked)
     332             :     {
     333             : #ifdef USE_ASSERT_CHECKING
     334             :         LOCKTAG     tag;
     335             : 
     336             :         Assert(CheckRelationOidLockedByMe(SubscriptionRelRelationId,
     337             :                                           RowExclusiveLock, true));
     338             :         SET_LOCKTAG_OBJECT(tag, InvalidOid, SubscriptionRelationId, subid, 0);
     339             :         Assert(LockHeldByMe(&tag, AccessShareLock, true));
     340             : #endif
     341             : 
     342         356 :         rel = table_open(SubscriptionRelRelationId, NoLock);
     343             :     }
     344             :     else
     345             :     {
     346        1116 :         LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     347        1116 :         rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     348             :     }
     349             : 
     350             :     /* Try finding existing mapping. */
     351        1472 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     352             :                               ObjectIdGetDatum(relid),
     353             :                               ObjectIdGetDatum(subid));
     354        1472 :     if (!HeapTupleIsValid(tup))
     355           0 :         elog(ERROR, "subscription table %u in subscription %u does not exist",
     356             :              relid, subid);
     357             : 
     358             :     /* Update the tuple. */
     359        1472 :     memset(values, 0, sizeof(values));
     360        1472 :     memset(nulls, false, sizeof(nulls));
     361        1472 :     memset(replaces, false, sizeof(replaces));
     362             : 
     363        1472 :     replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
     364        1472 :     values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     365             : 
     366        1472 :     replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
     367        1472 :     if (sublsn != InvalidXLogRecPtr)
     368         720 :         values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     369             :     else
     370         752 :         nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     371             : 
     372        1472 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     373             :                             replaces);
     374             : 
     375             :     /* Update the catalog. */
     376        1472 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     377             : 
     378             :     /* Cleanup. */
     379        1472 :     table_close(rel, NoLock);
     380        1472 : }
     381             : 
     382             : /*
     383             :  * Get state of subscription table.
     384             :  *
     385             :  * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
     386             :  */
     387             : char
     388        8388 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
     389             : {
     390             :     HeapTuple   tup;
     391             :     char        substate;
     392             :     bool        isnull;
     393             :     Datum       d;
     394             :     Relation    rel;
     395             : 
     396             :     /*
     397             :      * This is to avoid the race condition with AlterSubscription which tries
     398             :      * to remove this relstate.
     399             :      */
     400        8388 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     401             : 
     402             :     /* Try finding the mapping. */
     403        8388 :     tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
     404             :                           ObjectIdGetDatum(relid),
     405             :                           ObjectIdGetDatum(subid));
     406             : 
     407        8388 :     if (!HeapTupleIsValid(tup))
     408             :     {
     409          52 :         table_close(rel, AccessShareLock);
     410          52 :         *sublsn = InvalidXLogRecPtr;
     411          52 :         return SUBREL_STATE_UNKNOWN;
     412             :     }
     413             : 
     414             :     /* Get the state. */
     415        8336 :     substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
     416             : 
     417             :     /* Get the LSN */
     418        8336 :     d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     419             :                         Anum_pg_subscription_rel_srsublsn, &isnull);
     420        8336 :     if (isnull)
     421        7234 :         *sublsn = InvalidXLogRecPtr;
     422             :     else
     423        1102 :         *sublsn = DatumGetLSN(d);
     424             : 
     425             :     /* Cleanup */
     426        8336 :     ReleaseSysCache(tup);
     427             : 
     428        8336 :     table_close(rel, AccessShareLock);
     429             : 
     430        8336 :     return substate;
     431             : }
     432             : 
     433             : /*
     434             :  * Drop subscription relation mapping. These can be for a particular
     435             :  * subscription, or for a particular relation, or both.
     436             :  */
     437             : void
     438       48176 : RemoveSubscriptionRel(Oid subid, Oid relid)
     439             : {
     440             :     Relation    rel;
     441             :     TableScanDesc scan;
     442             :     ScanKeyData skey[2];
     443             :     HeapTuple   tup;
     444       48176 :     int         nkeys = 0;
     445             : 
     446       48176 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     447             : 
     448       48176 :     if (OidIsValid(subid))
     449             :     {
     450         260 :         ScanKeyInit(&skey[nkeys++],
     451             :                     Anum_pg_subscription_rel_srsubid,
     452             :                     BTEqualStrategyNumber,
     453             :                     F_OIDEQ,
     454             :                     ObjectIdGetDatum(subid));
     455             :     }
     456             : 
     457       48176 :     if (OidIsValid(relid))
     458             :     {
     459       47956 :         ScanKeyInit(&skey[nkeys++],
     460             :                     Anum_pg_subscription_rel_srrelid,
     461             :                     BTEqualStrategyNumber,
     462             :                     F_OIDEQ,
     463             :                     ObjectIdGetDatum(relid));
     464             :     }
     465             : 
     466             :     /* Do the search and delete what we found. */
     467       48176 :     scan = table_beginscan_catalog(rel, nkeys, skey);
     468       48402 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     469             :     {
     470             :         Form_pg_subscription_rel subrel;
     471             : 
     472         226 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     473             : 
     474             :         /*
     475             :          * We don't allow to drop the relation mapping when the table
     476             :          * synchronization is in progress unless the caller updates the
     477             :          * corresponding subscription as well. This is to ensure that we don't
     478             :          * leave tablesync slots or origins in the system when the
     479             :          * corresponding table is dropped.
     480             :          */
     481         226 :         if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY)
     482             :         {
     483           0 :             ereport(ERROR,
     484             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     485             :                      errmsg("could not drop relation mapping for subscription \"%s\"",
     486             :                             get_subscription_name(subrel->srsubid, false)),
     487             :                      errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
     488             :                                get_rel_name(relid), subrel->srsubstate),
     489             : 
     490             :             /*
     491             :              * translator: first %s is a SQL ALTER command and second %s is a
     492             :              * SQL DROP command
     493             :              */
     494             :                      errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
     495             :                              "ALTER SUBSCRIPTION ... ENABLE",
     496             :                              "DROP SUBSCRIPTION ...")));
     497             :         }
     498             : 
     499         226 :         CatalogTupleDelete(rel, &tup->t_self);
     500             :     }
     501       48176 :     table_endscan(scan);
     502             : 
     503       48176 :     table_close(rel, RowExclusiveLock);
     504       48176 : }
     505             : 
     506             : /*
     507             :  * Does the subscription have any relations?
     508             :  *
     509             :  * Use this function only to know true/false, and when you have no need for the
     510             :  * List returned by GetSubscriptionRelations.
     511             :  */
     512             : bool
     513         492 : HasSubscriptionRelations(Oid subid)
     514             : {
     515             :     Relation    rel;
     516             :     ScanKeyData skey[1];
     517             :     SysScanDesc scan;
     518             :     bool        has_subrels;
     519             : 
     520         492 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     521             : 
     522         492 :     ScanKeyInit(&skey[0],
     523             :                 Anum_pg_subscription_rel_srsubid,
     524             :                 BTEqualStrategyNumber, F_OIDEQ,
     525             :                 ObjectIdGetDatum(subid));
     526             : 
     527         492 :     scan = systable_beginscan(rel, InvalidOid, false,
     528             :                               NULL, 1, skey);
     529             : 
     530             :     /* If even a single tuple exists then the subscription has tables. */
     531         492 :     has_subrels = HeapTupleIsValid(systable_getnext(scan));
     532             : 
     533             :     /* Cleanup */
     534         492 :     systable_endscan(scan);
     535         492 :     table_close(rel, AccessShareLock);
     536             : 
     537         492 :     return has_subrels;
     538             : }
     539             : 
     540             : /*
     541             :  * Get the relations for the subscription.
     542             :  *
     543             :  * If not_ready is true, return only the relations that are not in a ready
     544             :  * state, otherwise return all the relations of the subscription.  The
     545             :  * returned list is palloc'ed in the current memory context.
     546             :  */
     547             : List *
     548        2028 : GetSubscriptionRelations(Oid subid, bool not_ready)
     549             : {
     550        2028 :     List       *res = NIL;
     551             :     Relation    rel;
     552             :     HeapTuple   tup;
     553        2028 :     int         nkeys = 0;
     554             :     ScanKeyData skey[2];
     555             :     SysScanDesc scan;
     556             : 
     557        2028 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     558             : 
     559        2028 :     ScanKeyInit(&skey[nkeys++],
     560             :                 Anum_pg_subscription_rel_srsubid,
     561             :                 BTEqualStrategyNumber, F_OIDEQ,
     562             :                 ObjectIdGetDatum(subid));
     563             : 
     564        2028 :     if (not_ready)
     565        1964 :         ScanKeyInit(&skey[nkeys++],
     566             :                     Anum_pg_subscription_rel_srsubstate,
     567             :                     BTEqualStrategyNumber, F_CHARNE,
     568             :                     CharGetDatum(SUBREL_STATE_READY));
     569             : 
     570        2028 :     scan = systable_beginscan(rel, InvalidOid, false,
     571             :                               NULL, nkeys, skey);
     572             : 
     573        5098 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     574             :     {
     575             :         Form_pg_subscription_rel subrel;
     576             :         SubscriptionRelState *relstate;
     577             :         Datum       d;
     578             :         bool        isnull;
     579             : 
     580        3070 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     581             : 
     582        3070 :         relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
     583        3070 :         relstate->relid = subrel->srrelid;
     584        3070 :         relstate->state = subrel->srsubstate;
     585        3070 :         d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     586             :                             Anum_pg_subscription_rel_srsublsn, &isnull);
     587        3070 :         if (isnull)
     588        2532 :             relstate->lsn = InvalidXLogRecPtr;
     589             :         else
     590         538 :             relstate->lsn = DatumGetLSN(d);
     591             : 
     592        3070 :         res = lappend(res, relstate);
     593             :     }
     594             : 
     595             :     /* Cleanup */
     596        2028 :     systable_endscan(scan);
     597        2028 :     table_close(rel, AccessShareLock);
     598             : 
     599        2028 :     return res;
     600             : }

Generated by: LCOV version 1.16