LCOV - code coverage report
Current view: top level - src/backend/catalog - pg_subscription.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 95.2 % 227 216
Test Date: 2026-02-17 17:20:33 Functions: 100.0 % 13 13
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * pg_subscription.c
       4              :  *      replication subscriptions
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  * IDENTIFICATION
      10              :  *      src/backend/catalog/pg_subscription.c
      11              :  *
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : 
      15              : #include "postgres.h"
      16              : 
      17              : #include "access/genam.h"
      18              : #include "access/heapam.h"
      19              : #include "access/htup_details.h"
      20              : #include "access/tableam.h"
      21              : #include "catalog/indexing.h"
      22              : #include "catalog/pg_subscription.h"
      23              : #include "catalog/pg_subscription_rel.h"
      24              : #include "catalog/pg_type.h"
      25              : #include "miscadmin.h"
      26              : #include "storage/lmgr.h"
      27              : #include "utils/array.h"
      28              : #include "utils/builtins.h"
      29              : #include "utils/fmgroids.h"
      30              : #include "utils/lsyscache.h"
      31              : #include "utils/pg_lsn.h"
      32              : #include "utils/rel.h"
      33              : #include "utils/syscache.h"
      34              : 
      35              : static List *textarray_to_stringlist(ArrayType *textarray);
      36              : 
      37              : /*
      38              :  * Add a comma-separated list of publication names to the 'dest' string.
      39              :  */
      40              : void
      41          517 : GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
      42              : {
      43              :     ListCell   *lc;
      44          517 :     bool        first = true;
      45              : 
      46              :     Assert(publications != NIL);
      47              : 
      48         1337 :     foreach(lc, publications)
      49              :     {
      50          820 :         char       *pubname = strVal(lfirst(lc));
      51              : 
      52          820 :         if (first)
      53          517 :             first = false;
      54              :         else
      55          303 :             appendStringInfoString(dest, ", ");
      56              : 
      57          820 :         if (quote_literal)
      58          810 :             appendStringInfoString(dest, quote_literal_cstr(pubname));
      59              :         else
      60              :         {
      61           10 :             appendStringInfoChar(dest, '"');
      62           10 :             appendStringInfoString(dest, pubname);
      63           10 :             appendStringInfoChar(dest, '"');
      64              :         }
      65              :     }
      66          517 : }
      67              : 
      68              : /*
      69              :  * Fetch the subscription from the syscache.
      70              :  */
      71              : Subscription *
      72          887 : GetSubscription(Oid subid, bool missing_ok)
      73              : {
      74              :     HeapTuple   tup;
      75              :     Subscription *sub;
      76              :     Form_pg_subscription subform;
      77              :     Datum       datum;
      78              :     bool        isnull;
      79              : 
      80          887 :     tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
      81              : 
      82          887 :     if (!HeapTupleIsValid(tup))
      83              :     {
      84           58 :         if (missing_ok)
      85           58 :             return NULL;
      86              : 
      87            0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
      88              :     }
      89              : 
      90          829 :     subform = (Form_pg_subscription) GETSTRUCT(tup);
      91              : 
      92          829 :     sub = palloc_object(Subscription);
      93          829 :     sub->oid = subid;
      94          829 :     sub->dbid = subform->subdbid;
      95          829 :     sub->skiplsn = subform->subskiplsn;
      96          829 :     sub->name = pstrdup(NameStr(subform->subname));
      97          829 :     sub->owner = subform->subowner;
      98          829 :     sub->enabled = subform->subenabled;
      99          829 :     sub->binary = subform->subbinary;
     100          829 :     sub->stream = subform->substream;
     101          829 :     sub->twophasestate = subform->subtwophasestate;
     102          829 :     sub->disableonerr = subform->subdisableonerr;
     103          829 :     sub->passwordrequired = subform->subpasswordrequired;
     104          829 :     sub->runasowner = subform->subrunasowner;
     105          829 :     sub->failover = subform->subfailover;
     106          829 :     sub->retaindeadtuples = subform->subretaindeadtuples;
     107          829 :     sub->maxretention = subform->submaxretention;
     108          829 :     sub->retentionactive = subform->subretentionactive;
     109              : 
     110              :     /* Get conninfo */
     111          829 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     112              :                                    tup,
     113              :                                    Anum_pg_subscription_subconninfo);
     114          829 :     sub->conninfo = TextDatumGetCString(datum);
     115              : 
     116              :     /* Get slotname */
     117          829 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID,
     118              :                             tup,
     119              :                             Anum_pg_subscription_subslotname,
     120              :                             &isnull);
     121          829 :     if (!isnull)
     122          796 :         sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
     123              :     else
     124           33 :         sub->slotname = NULL;
     125              : 
     126              :     /* Get synccommit */
     127          829 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     128              :                                    tup,
     129              :                                    Anum_pg_subscription_subsynccommit);
     130          829 :     sub->synccommit = TextDatumGetCString(datum);
     131              : 
     132              :     /* Get publications */
     133          829 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     134              :                                    tup,
     135              :                                    Anum_pg_subscription_subpublications);
     136          829 :     sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
     137              : 
     138              :     /* Get origin */
     139          829 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     140              :                                    tup,
     141              :                                    Anum_pg_subscription_suborigin);
     142          829 :     sub->origin = TextDatumGetCString(datum);
     143              : 
     144              :     /* Is the subscription owner a superuser? */
     145          829 :     sub->ownersuperuser = superuser_arg(sub->owner);
     146              : 
     147          829 :     ReleaseSysCache(tup);
     148              : 
     149          829 :     return sub;
     150              : }
     151              : 
     152              : /*
     153              :  * Return number of subscriptions defined in given database.
     154              :  * Used by dropdb() to check if database can indeed be dropped.
     155              :  */
     156              : int
     157           47 : CountDBSubscriptions(Oid dbid)
     158              : {
     159           47 :     int         nsubs = 0;
     160              :     Relation    rel;
     161              :     ScanKeyData scankey;
     162              :     SysScanDesc scan;
     163              :     HeapTuple   tup;
     164              : 
     165           47 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     166              : 
     167           47 :     ScanKeyInit(&scankey,
     168              :                 Anum_pg_subscription_subdbid,
     169              :                 BTEqualStrategyNumber, F_OIDEQ,
     170              :                 ObjectIdGetDatum(dbid));
     171              : 
     172           47 :     scan = systable_beginscan(rel, InvalidOid, false,
     173              :                               NULL, 1, &scankey);
     174              : 
     175           47 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     176            0 :         nsubs++;
     177              : 
     178           47 :     systable_endscan(scan);
     179              : 
     180           47 :     table_close(rel, NoLock);
     181              : 
     182           47 :     return nsubs;
     183              : }
     184              : 
     185              : /*
     186              :  * Free memory allocated by subscription struct.
     187              :  */
     188              : void
     189           43 : FreeSubscription(Subscription *sub)
     190              : {
     191           43 :     pfree(sub->name);
     192           43 :     pfree(sub->conninfo);
     193           43 :     if (sub->slotname)
     194           43 :         pfree(sub->slotname);
     195           43 :     list_free_deep(sub->publications);
     196           43 :     pfree(sub);
     197           43 : }
     198              : 
     199              : /*
     200              :  * Disable the given subscription.
     201              :  */
     202              : void
     203            4 : DisableSubscription(Oid subid)
     204              : {
     205              :     Relation    rel;
     206              :     bool        nulls[Natts_pg_subscription];
     207              :     bool        replaces[Natts_pg_subscription];
     208              :     Datum       values[Natts_pg_subscription];
     209              :     HeapTuple   tup;
     210              : 
     211              :     /* Look up the subscription in the catalog */
     212            4 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     213            4 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
     214              : 
     215            4 :     if (!HeapTupleIsValid(tup))
     216            0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
     217              : 
     218            4 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     219              : 
     220              :     /* Form a new tuple. */
     221            4 :     memset(values, 0, sizeof(values));
     222            4 :     memset(nulls, false, sizeof(nulls));
     223            4 :     memset(replaces, false, sizeof(replaces));
     224              : 
     225              :     /* Set the subscription to disabled. */
     226            4 :     values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
     227            4 :     replaces[Anum_pg_subscription_subenabled - 1] = true;
     228              : 
     229              :     /* Update the catalog */
     230            4 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     231              :                             replaces);
     232            4 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     233            4 :     heap_freetuple(tup);
     234              : 
     235            4 :     table_close(rel, NoLock);
     236            4 : }
     237              : 
     238              : /*
     239              :  * Convert text array to list of strings.
     240              :  *
     241              :  * Note: the resulting list of strings is pallocated here.
     242              :  */
     243              : static List *
     244          829 : textarray_to_stringlist(ArrayType *textarray)
     245              : {
     246              :     Datum      *elems;
     247              :     int         nelems,
     248              :                 i;
     249          829 :     List       *res = NIL;
     250              : 
     251          829 :     deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
     252              : 
     253          829 :     if (nelems == 0)
     254            0 :         return NIL;
     255              : 
     256         2035 :     for (i = 0; i < nelems; i++)
     257         1206 :         res = lappend(res, makeString(TextDatumGetCString(elems[i])));
     258              : 
     259          829 :     return res;
     260              : }
     261              : 
     262              : /*
     263              :  * Add new state record for a subscription table.
     264              :  *
     265              :  * If retain_lock is true, then don't release the locks taken in this function.
     266              :  * We normally release the locks at the end of transaction but in binary-upgrade
     267              :  * mode, we expect to release those immediately.
     268              :  */
     269              : void
     270          216 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
     271              :                         XLogRecPtr sublsn, bool retain_lock)
     272              : {
     273              :     Relation    rel;
     274              :     HeapTuple   tup;
     275              :     bool        nulls[Natts_pg_subscription_rel];
     276              :     Datum       values[Natts_pg_subscription_rel];
     277              : 
     278          216 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     279              : 
     280          216 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     281              : 
     282              :     /* Try finding existing mapping. */
     283          216 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     284              :                               ObjectIdGetDatum(relid),
     285              :                               ObjectIdGetDatum(subid));
     286          216 :     if (HeapTupleIsValid(tup))
     287            0 :         elog(ERROR, "subscription relation %u in subscription %u already exists",
     288              :              relid, subid);
     289              : 
     290              :     /* Form the tuple. */
     291          216 :     memset(values, 0, sizeof(values));
     292          216 :     memset(nulls, false, sizeof(nulls));
     293          216 :     values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
     294          216 :     values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
     295          216 :     values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     296          216 :     if (XLogRecPtrIsValid(sublsn))
     297            2 :         values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     298              :     else
     299          214 :         nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     300              : 
     301          216 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     302              : 
     303              :     /* Insert tuple into catalog. */
     304          216 :     CatalogTupleInsert(rel, tup);
     305              : 
     306          216 :     heap_freetuple(tup);
     307              : 
     308              :     /* Cleanup. */
     309          216 :     if (retain_lock)
     310              :     {
     311          213 :         table_close(rel, NoLock);
     312              :     }
     313              :     else
     314              :     {
     315            3 :         table_close(rel, RowExclusiveLock);
     316            3 :         UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     317              :     }
     318          216 : }
     319              : 
     320              : /*
     321              :  * Update the state of a subscription table.
     322              :  */
     323              : void
     324          762 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
     325              :                            XLogRecPtr sublsn, bool already_locked)
     326              : {
     327              :     Relation    rel;
     328              :     HeapTuple   tup;
     329              :     bool        nulls[Natts_pg_subscription_rel];
     330              :     Datum       values[Natts_pg_subscription_rel];
     331              :     bool        replaces[Natts_pg_subscription_rel];
     332              : 
     333          762 :     if (already_locked)
     334              :     {
     335              : #ifdef USE_ASSERT_CHECKING
     336              :         LOCKTAG     tag;
     337              : 
     338              :         Assert(CheckRelationOidLockedByMe(SubscriptionRelRelationId,
     339              :                                           RowExclusiveLock, true));
     340              :         SET_LOCKTAG_OBJECT(tag, InvalidOid, SubscriptionRelationId, subid, 0);
     341              :         Assert(LockHeldByMe(&tag, AccessShareLock, true));
     342              : #endif
     343              : 
     344          179 :         rel = table_open(SubscriptionRelRelationId, NoLock);
     345              :     }
     346              :     else
     347              :     {
     348          583 :         LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     349          583 :         rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     350              :     }
     351              : 
     352              :     /* Try finding existing mapping. */
     353          762 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     354              :                               ObjectIdGetDatum(relid),
     355              :                               ObjectIdGetDatum(subid));
     356          762 :     if (!HeapTupleIsValid(tup))
     357            0 :         elog(ERROR, "subscription relation %u in subscription %u does not exist",
     358              :              relid, subid);
     359              : 
     360              :     /* Update the tuple. */
     361          762 :     memset(values, 0, sizeof(values));
     362          762 :     memset(nulls, false, sizeof(nulls));
     363          762 :     memset(replaces, false, sizeof(replaces));
     364              : 
     365          762 :     replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
     366          762 :     values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     367              : 
     368          762 :     replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
     369          762 :     if (XLogRecPtrIsValid(sublsn))
     370          374 :         values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     371              :     else
     372          388 :         nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     373              : 
     374          762 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     375              :                             replaces);
     376              : 
     377              :     /* Update the catalog. */
     378          762 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     379              : 
     380              :     /* Cleanup. */
     381          762 :     table_close(rel, NoLock);
     382          762 : }
     383              : 
     384              : /*
     385              :  * Get state of subscription table.
     386              :  *
     387              :  * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
     388              :  */
     389              : char
     390         1259 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
     391              : {
     392              :     HeapTuple   tup;
     393              :     char        substate;
     394              :     bool        isnull;
     395              :     Datum       d;
     396              :     Relation    rel;
     397              : 
     398              :     /*
     399              :      * This is to avoid the race condition with AlterSubscription which tries
     400              :      * to remove this relstate.
     401              :      */
     402         1259 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     403              : 
     404              :     /* Try finding the mapping. */
     405         1259 :     tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
     406              :                           ObjectIdGetDatum(relid),
     407              :                           ObjectIdGetDatum(subid));
     408              : 
     409         1259 :     if (!HeapTupleIsValid(tup))
     410              :     {
     411           25 :         table_close(rel, AccessShareLock);
     412           25 :         *sublsn = InvalidXLogRecPtr;
     413           25 :         return SUBREL_STATE_UNKNOWN;
     414              :     }
     415              : 
     416              :     /* Get the state. */
     417         1234 :     substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
     418              : 
     419              :     /* Get the LSN */
     420         1234 :     d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     421              :                         Anum_pg_subscription_rel_srsublsn, &isnull);
     422         1234 :     if (isnull)
     423          651 :         *sublsn = InvalidXLogRecPtr;
     424              :     else
     425          583 :         *sublsn = DatumGetLSN(d);
     426              : 
     427              :     /* Cleanup */
     428         1234 :     ReleaseSysCache(tup);
     429              : 
     430         1234 :     table_close(rel, AccessShareLock);
     431              : 
     432         1234 :     return substate;
     433              : }
     434              : 
     435              : /*
     436              :  * Drop subscription relation mapping. These can be for a particular
     437              :  * subscription, or for a particular relation, or both.
     438              :  */
     439              : void
     440        26077 : RemoveSubscriptionRel(Oid subid, Oid relid)
     441              : {
     442              :     Relation    rel;
     443              :     TableScanDesc scan;
     444              :     ScanKeyData skey[2];
     445              :     HeapTuple   tup;
     446        26077 :     int         nkeys = 0;
     447              : 
     448        26077 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     449              : 
     450        26077 :     if (OidIsValid(subid))
     451              :     {
     452          137 :         ScanKeyInit(&skey[nkeys++],
     453              :                     Anum_pg_subscription_rel_srsubid,
     454              :                     BTEqualStrategyNumber,
     455              :                     F_OIDEQ,
     456              :                     ObjectIdGetDatum(subid));
     457              :     }
     458              : 
     459        26077 :     if (OidIsValid(relid))
     460              :     {
     461        25961 :         ScanKeyInit(&skey[nkeys++],
     462              :                     Anum_pg_subscription_rel_srrelid,
     463              :                     BTEqualStrategyNumber,
     464              :                     F_OIDEQ,
     465              :                     ObjectIdGetDatum(relid));
     466              :     }
     467              : 
     468              :     /* Do the search and delete what we found. */
     469        26077 :     scan = table_beginscan_catalog(rel, nkeys, skey);
     470        26196 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     471              :     {
     472              :         Form_pg_subscription_rel subrel;
     473              : 
     474          119 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     475              : 
     476              :         /*
     477              :          * We don't allow to drop the relation mapping when the table
     478              :          * synchronization is in progress unless the caller updates the
     479              :          * corresponding subscription as well. This is to ensure that we don't
     480              :          * leave tablesync slots or origins in the system when the
     481              :          * corresponding table is dropped. For sequences, however, it's ok to
     482              :          * drop them since no separate slots or origins are created during
     483              :          * synchronization.
     484              :          */
     485          119 :         if (!OidIsValid(subid) &&
     486           16 :             subrel->srsubstate != SUBREL_STATE_READY &&
     487            0 :             get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE)
     488              :         {
     489            0 :             ereport(ERROR,
     490              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     491              :                      errmsg("could not drop relation mapping for subscription \"%s\"",
     492              :                             get_subscription_name(subrel->srsubid, false)),
     493              :                      errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
     494              :                                get_rel_name(relid), subrel->srsubstate),
     495              : 
     496              :             /*
     497              :              * translator: first %s is a SQL ALTER command and second %s is a
     498              :              * SQL DROP command
     499              :              */
     500              :                      errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
     501              :                              "ALTER SUBSCRIPTION ... ENABLE",
     502              :                              "DROP SUBSCRIPTION ...")));
     503              :         }
     504              : 
     505          119 :         CatalogTupleDelete(rel, &tup->t_self);
     506              :     }
     507        26077 :     table_endscan(scan);
     508              : 
     509        26077 :     table_close(rel, RowExclusiveLock);
     510        26077 : }
     511              : 
     512              : /*
     513              :  * Does the subscription have any tables?
     514              :  *
     515              :  * Use this function only to know true/false, and when you have no need for the
     516              :  * List returned by GetSubscriptionRelations.
     517              :  */
     518              : bool
     519          258 : HasSubscriptionTables(Oid subid)
     520              : {
     521              :     Relation    rel;
     522              :     ScanKeyData skey[1];
     523              :     SysScanDesc scan;
     524              :     HeapTuple   tup;
     525          258 :     bool        has_subtables = false;
     526              : 
     527          258 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     528              : 
     529          258 :     ScanKeyInit(&skey[0],
     530              :                 Anum_pg_subscription_rel_srsubid,
     531              :                 BTEqualStrategyNumber, F_OIDEQ,
     532              :                 ObjectIdGetDatum(subid));
     533              : 
     534          258 :     scan = systable_beginscan(rel, InvalidOid, false,
     535              :                               NULL, 1, skey);
     536              : 
     537          269 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     538              :     {
     539              :         Form_pg_subscription_rel subrel;
     540              :         char        relkind;
     541              : 
     542          263 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     543          263 :         relkind = get_rel_relkind(subrel->srrelid);
     544              : 
     545          263 :         if (relkind == RELKIND_RELATION ||
     546              :             relkind == RELKIND_PARTITIONED_TABLE)
     547              :         {
     548          252 :             has_subtables = true;
     549          252 :             break;
     550              :         }
     551              :     }
     552              : 
     553              :     /* Cleanup */
     554          258 :     systable_endscan(scan);
     555          258 :     table_close(rel, AccessShareLock);
     556              : 
     557          258 :     return has_subtables;
     558              : }
     559              : 
     560              : /*
     561              :  * Get the relations for the subscription.
     562              :  *
     563              :  * If not_ready is true, return only the relations that are not in a ready
     564              :  * state, otherwise return all the relations of the subscription.  The
     565              :  * returned list is palloc'ed in the current memory context.
     566              :  */
     567              : List *
     568         1113 : GetSubscriptionRelations(Oid subid, bool tables, bool sequences,
     569              :                          bool not_ready)
     570              : {
     571         1113 :     List       *res = NIL;
     572              :     Relation    rel;
     573              :     HeapTuple   tup;
     574         1113 :     int         nkeys = 0;
     575              :     ScanKeyData skey[2];
     576              :     SysScanDesc scan;
     577              : 
     578              :     /* One or both of 'tables' and 'sequences' must be true. */
     579              :     Assert(tables || sequences);
     580              : 
     581         1113 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     582              : 
     583         1113 :     ScanKeyInit(&skey[nkeys++],
     584              :                 Anum_pg_subscription_rel_srsubid,
     585              :                 BTEqualStrategyNumber, F_OIDEQ,
     586              :                 ObjectIdGetDatum(subid));
     587              : 
     588         1113 :     if (not_ready)
     589         1075 :         ScanKeyInit(&skey[nkeys++],
     590              :                     Anum_pg_subscription_rel_srsubstate,
     591              :                     BTEqualStrategyNumber, F_CHARNE,
     592              :                     CharGetDatum(SUBREL_STATE_READY));
     593              : 
     594         1113 :     scan = systable_beginscan(rel, InvalidOid, false,
     595              :                               NULL, nkeys, skey);
     596              : 
     597         2985 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     598              :     {
     599              :         Form_pg_subscription_rel subrel;
     600              :         SubscriptionRelState *relstate;
     601              :         Datum       d;
     602              :         bool        isnull;
     603              :         char        relkind;
     604              : 
     605         1872 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     606              : 
     607              :         /* Relation is either a sequence or a table */
     608         1872 :         relkind = get_rel_relkind(subrel->srrelid);
     609              :         Assert(relkind == RELKIND_SEQUENCE || relkind == RELKIND_RELATION ||
     610              :                relkind == RELKIND_PARTITIONED_TABLE);
     611              : 
     612              :         /* Skip sequences if they were not requested */
     613         1872 :         if ((relkind == RELKIND_SEQUENCE) && !sequences)
     614            0 :             continue;
     615              : 
     616              :         /* Skip tables if they were not requested */
     617         1872 :         if ((relkind == RELKIND_RELATION ||
     618         1851 :              relkind == RELKIND_PARTITIONED_TABLE) && !tables)
     619            0 :             continue;
     620              : 
     621         1872 :         relstate = palloc_object(SubscriptionRelState);
     622         1872 :         relstate->relid = subrel->srrelid;
     623         1872 :         relstate->state = subrel->srsubstate;
     624         1872 :         d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     625              :                             Anum_pg_subscription_rel_srsublsn, &isnull);
     626         1872 :         if (isnull)
     627         1529 :             relstate->lsn = InvalidXLogRecPtr;
     628              :         else
     629          343 :             relstate->lsn = DatumGetLSN(d);
     630              : 
     631         1872 :         res = lappend(res, relstate);
     632              :     }
     633              : 
     634              :     /* Cleanup */
     635         1113 :     systable_endscan(scan);
     636         1113 :     table_close(rel, AccessShareLock);
     637              : 
     638         1113 :     return res;
     639              : }
     640              : 
     641              : /*
     642              :  * Update the dead tuple retention status for the given subscription.
     643              :  */
     644              : void
     645            2 : UpdateDeadTupleRetentionStatus(Oid subid, bool active)
     646              : {
     647              :     Relation    rel;
     648              :     bool        nulls[Natts_pg_subscription];
     649              :     bool        replaces[Natts_pg_subscription];
     650              :     Datum       values[Natts_pg_subscription];
     651              :     HeapTuple   tup;
     652              : 
     653              :     /* Look up the subscription in the catalog */
     654            2 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     655            2 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
     656              : 
     657            2 :     if (!HeapTupleIsValid(tup))
     658            0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
     659              : 
     660            2 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     661              : 
     662              :     /* Form a new tuple. */
     663            2 :     memset(values, 0, sizeof(values));
     664            2 :     memset(nulls, false, sizeof(nulls));
     665            2 :     memset(replaces, false, sizeof(replaces));
     666              : 
     667              :     /* Set the subscription to disabled. */
     668            2 :     values[Anum_pg_subscription_subretentionactive - 1] = active;
     669            2 :     replaces[Anum_pg_subscription_subretentionactive - 1] = true;
     670              : 
     671              :     /* Update the catalog */
     672            2 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     673              :                             replaces);
     674            2 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     675            2 :     heap_freetuple(tup);
     676              : 
     677            2 :     table_close(rel, NoLock);
     678            2 : }
        

Generated by: LCOV version 2.0-1