LCOV - code coverage report
Current view: top level - src/backend/catalog - pg_subscription.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 205 213 96.2 %
Date: 2025-09-18 14:17:54 Functions: 13 13 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_subscription.c
       4             :  *      replication subscriptions
       5             :  *
       6             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *      src/backend/catalog/pg_subscription.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/genam.h"
      18             : #include "access/heapam.h"
      19             : #include "access/htup_details.h"
      20             : #include "access/tableam.h"
      21             : #include "catalog/indexing.h"
      22             : #include "catalog/pg_subscription.h"
      23             : #include "catalog/pg_subscription_rel.h"
      24             : #include "catalog/pg_type.h"
      25             : #include "miscadmin.h"
      26             : #include "storage/lmgr.h"
      27             : #include "utils/array.h"
      28             : #include "utils/builtins.h"
      29             : #include "utils/fmgroids.h"
      30             : #include "utils/lsyscache.h"
      31             : #include "utils/pg_lsn.h"
      32             : #include "utils/rel.h"
      33             : #include "utils/syscache.h"
      34             : 
      35             : static List *textarray_to_stringlist(ArrayType *textarray);
      36             : 
      37             : /*
      38             :  * Add a comma-separated list of publication names to the 'dest' string.
      39             :  */
      40             : void
      41         988 : GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
      42             : {
      43             :     ListCell   *lc;
      44         988 :     bool        first = true;
      45             : 
      46             :     Assert(publications != NIL);
      47             : 
      48        2562 :     foreach(lc, publications)
      49             :     {
      50        1574 :         char       *pubname = strVal(lfirst(lc));
      51             : 
      52        1574 :         if (first)
      53         988 :             first = false;
      54             :         else
      55         586 :             appendStringInfoString(dest, ", ");
      56             : 
      57        1574 :         if (quote_literal)
      58        1554 :             appendStringInfoString(dest, quote_literal_cstr(pubname));
      59             :         else
      60             :         {
      61          20 :             appendStringInfoChar(dest, '"');
      62          20 :             appendStringInfoString(dest, pubname);
      63          20 :             appendStringInfoChar(dest, '"');
      64             :         }
      65             :     }
      66         988 : }
      67             : 
      68             : /*
      69             :  * Fetch the subscription from the syscache.
      70             :  */
      71             : Subscription *
      72        1704 : GetSubscription(Oid subid, bool missing_ok)
      73             : {
      74             :     HeapTuple   tup;
      75             :     Subscription *sub;
      76             :     Form_pg_subscription subform;
      77             :     Datum       datum;
      78             :     bool        isnull;
      79             : 
      80        1704 :     tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
      81             : 
      82        1704 :     if (!HeapTupleIsValid(tup))
      83             :     {
      84         104 :         if (missing_ok)
      85         104 :             return NULL;
      86             : 
      87           0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
      88             :     }
      89             : 
      90        1600 :     subform = (Form_pg_subscription) GETSTRUCT(tup);
      91             : 
      92        1600 :     sub = (Subscription *) palloc(sizeof(Subscription));
      93        1600 :     sub->oid = subid;
      94        1600 :     sub->dbid = subform->subdbid;
      95        1600 :     sub->skiplsn = subform->subskiplsn;
      96        1600 :     sub->name = pstrdup(NameStr(subform->subname));
      97        1600 :     sub->owner = subform->subowner;
      98        1600 :     sub->enabled = subform->subenabled;
      99        1600 :     sub->binary = subform->subbinary;
     100        1600 :     sub->stream = subform->substream;
     101        1600 :     sub->twophasestate = subform->subtwophasestate;
     102        1600 :     sub->disableonerr = subform->subdisableonerr;
     103        1600 :     sub->passwordrequired = subform->subpasswordrequired;
     104        1600 :     sub->runasowner = subform->subrunasowner;
     105        1600 :     sub->failover = subform->subfailover;
     106        1600 :     sub->retaindeadtuples = subform->subretaindeadtuples;
     107        1600 :     sub->maxretention = subform->submaxretention;
     108        1600 :     sub->retentionactive = subform->subretentionactive;
     109             : 
     110             :     /* Get conninfo */
     111        1600 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     112             :                                    tup,
     113             :                                    Anum_pg_subscription_subconninfo);
     114        1600 :     sub->conninfo = TextDatumGetCString(datum);
     115             : 
     116             :     /* Get slotname */
     117        1600 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID,
     118             :                             tup,
     119             :                             Anum_pg_subscription_subslotname,
     120             :                             &isnull);
     121        1600 :     if (!isnull)
     122        1534 :         sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
     123             :     else
     124          66 :         sub->slotname = NULL;
     125             : 
     126             :     /* Get synccommit */
     127        1600 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     128             :                                    tup,
     129             :                                    Anum_pg_subscription_subsynccommit);
     130        1600 :     sub->synccommit = TextDatumGetCString(datum);
     131             : 
     132             :     /* Get publications */
     133        1600 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     134             :                                    tup,
     135             :                                    Anum_pg_subscription_subpublications);
     136        1600 :     sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
     137             : 
     138             :     /* Get origin */
     139        1600 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     140             :                                    tup,
     141             :                                    Anum_pg_subscription_suborigin);
     142        1600 :     sub->origin = TextDatumGetCString(datum);
     143             : 
     144             :     /* Is the subscription owner a superuser? */
     145        1600 :     sub->ownersuperuser = superuser_arg(sub->owner);
     146             : 
     147        1600 :     ReleaseSysCache(tup);
     148             : 
     149        1600 :     return sub;
     150             : }
     151             : 
     152             : /*
     153             :  * Return number of subscriptions defined in given database.
     154             :  * Used by dropdb() to check if database can indeed be dropped.
     155             :  */
     156             : int
     157          90 : CountDBSubscriptions(Oid dbid)
     158             : {
     159          90 :     int         nsubs = 0;
     160             :     Relation    rel;
     161             :     ScanKeyData scankey;
     162             :     SysScanDesc scan;
     163             :     HeapTuple   tup;
     164             : 
     165          90 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     166             : 
     167          90 :     ScanKeyInit(&scankey,
     168             :                 Anum_pg_subscription_subdbid,
     169             :                 BTEqualStrategyNumber, F_OIDEQ,
     170             :                 ObjectIdGetDatum(dbid));
     171             : 
     172          90 :     scan = systable_beginscan(rel, InvalidOid, false,
     173             :                               NULL, 1, &scankey);
     174             : 
     175          90 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     176           0 :         nsubs++;
     177             : 
     178          90 :     systable_endscan(scan);
     179             : 
     180          90 :     table_close(rel, NoLock);
     181             : 
     182          90 :     return nsubs;
     183             : }
     184             : 
     185             : /*
     186             :  * Free memory allocated by subscription struct.
     187             :  */
     188             : void
     189          82 : FreeSubscription(Subscription *sub)
     190             : {
     191          82 :     pfree(sub->name);
     192          82 :     pfree(sub->conninfo);
     193          82 :     if (sub->slotname)
     194          82 :         pfree(sub->slotname);
     195          82 :     list_free_deep(sub->publications);
     196          82 :     pfree(sub);
     197          82 : }
     198             : 
     199             : /*
     200             :  * Disable the given subscription.
     201             :  */
     202             : void
     203           8 : DisableSubscription(Oid subid)
     204             : {
     205             :     Relation    rel;
     206             :     bool        nulls[Natts_pg_subscription];
     207             :     bool        replaces[Natts_pg_subscription];
     208             :     Datum       values[Natts_pg_subscription];
     209             :     HeapTuple   tup;
     210             : 
     211             :     /* Look up the subscription in the catalog */
     212           8 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     213           8 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
     214             : 
     215           8 :     if (!HeapTupleIsValid(tup))
     216           0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
     217             : 
     218           8 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     219             : 
     220             :     /* Form a new tuple. */
     221           8 :     memset(values, 0, sizeof(values));
     222           8 :     memset(nulls, false, sizeof(nulls));
     223           8 :     memset(replaces, false, sizeof(replaces));
     224             : 
     225             :     /* Set the subscription to disabled. */
     226           8 :     values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
     227           8 :     replaces[Anum_pg_subscription_subenabled - 1] = true;
     228             : 
     229             :     /* Update the catalog */
     230           8 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     231             :                             replaces);
     232           8 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     233           8 :     heap_freetuple(tup);
     234             : 
     235           8 :     table_close(rel, NoLock);
     236           8 : }
     237             : 
     238             : /*
     239             :  * Convert text array to list of strings.
     240             :  *
     241             :  * Note: the resulting list of strings is pallocated here.
     242             :  */
     243             : static List *
     244        1600 : textarray_to_stringlist(ArrayType *textarray)
     245             : {
     246             :     Datum      *elems;
     247             :     int         nelems,
     248             :                 i;
     249        1600 :     List       *res = NIL;
     250             : 
     251        1600 :     deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
     252             : 
     253        1600 :     if (nelems == 0)
     254           0 :         return NIL;
     255             : 
     256        3918 :     for (i = 0; i < nelems; i++)
     257        2318 :         res = lappend(res, makeString(TextDatumGetCString(elems[i])));
     258             : 
     259        1600 :     return res;
     260             : }
     261             : 
     262             : /*
     263             :  * Add new state record for a subscription table.
     264             :  *
     265             :  * If retain_lock is true, then don't release the locks taken in this function.
     266             :  * We normally release the locks at the end of transaction but in binary-upgrade
     267             :  * mode, we expect to release those immediately.
     268             :  */
     269             : void
     270         408 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
     271             :                         XLogRecPtr sublsn, bool retain_lock)
     272             : {
     273             :     Relation    rel;
     274             :     HeapTuple   tup;
     275             :     bool        nulls[Natts_pg_subscription_rel];
     276             :     Datum       values[Natts_pg_subscription_rel];
     277             : 
     278         408 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     279             : 
     280         408 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     281             : 
     282             :     /* Try finding existing mapping. */
     283         408 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     284             :                               ObjectIdGetDatum(relid),
     285             :                               ObjectIdGetDatum(subid));
     286         408 :     if (HeapTupleIsValid(tup))
     287           0 :         elog(ERROR, "subscription table %u in subscription %u already exists",
     288             :              relid, subid);
     289             : 
     290             :     /* Form the tuple. */
     291         408 :     memset(values, 0, sizeof(values));
     292         408 :     memset(nulls, false, sizeof(nulls));
     293         408 :     values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
     294         408 :     values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
     295         408 :     values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     296         408 :     if (sublsn != InvalidXLogRecPtr)
     297           2 :         values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     298             :     else
     299         406 :         nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     300             : 
     301         408 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     302             : 
     303             :     /* Insert tuple into catalog. */
     304         408 :     CatalogTupleInsert(rel, tup);
     305             : 
     306         408 :     heap_freetuple(tup);
     307             : 
     308             :     /* Cleanup. */
     309         408 :     if (retain_lock)
     310             :     {
     311         404 :         table_close(rel, NoLock);
     312             :     }
     313             :     else
     314             :     {
     315           4 :         table_close(rel, RowExclusiveLock);
     316           4 :         UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     317             :     }
     318         408 : }
     319             : 
     320             : /*
     321             :  * Update the state of a subscription table.
     322             :  */
     323             : void
     324        1474 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
     325             :                            XLogRecPtr sublsn, bool already_locked)
     326             : {
     327             :     Relation    rel;
     328             :     HeapTuple   tup;
     329             :     bool        nulls[Natts_pg_subscription_rel];
     330             :     Datum       values[Natts_pg_subscription_rel];
     331             :     bool        replaces[Natts_pg_subscription_rel];
     332             : 
     333        1474 :     if (already_locked)
     334             :     {
     335             : #ifdef USE_ASSERT_CHECKING
     336             :         LOCKTAG     tag;
     337             : 
     338             :         Assert(CheckRelationOidLockedByMe(SubscriptionRelRelationId,
     339             :                                           RowExclusiveLock, true));
     340             :         SET_LOCKTAG_OBJECT(tag, InvalidOid, SubscriptionRelationId, subid, 0);
     341             :         Assert(LockHeldByMe(&tag, AccessShareLock, true));
     342             : #endif
     343             : 
     344         356 :         rel = table_open(SubscriptionRelRelationId, NoLock);
     345             :     }
     346             :     else
     347             :     {
     348        1118 :         LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     349        1118 :         rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     350             :     }
     351             : 
     352             :     /* Try finding existing mapping. */
     353        1474 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     354             :                               ObjectIdGetDatum(relid),
     355             :                               ObjectIdGetDatum(subid));
     356        1474 :     if (!HeapTupleIsValid(tup))
     357           0 :         elog(ERROR, "subscription table %u in subscription %u does not exist",
     358             :              relid, subid);
     359             : 
     360             :     /* Update the tuple. */
     361        1474 :     memset(values, 0, sizeof(values));
     362        1474 :     memset(nulls, false, sizeof(nulls));
     363        1474 :     memset(replaces, false, sizeof(replaces));
     364             : 
     365        1474 :     replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
     366        1474 :     values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     367             : 
     368        1474 :     replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
     369        1474 :     if (sublsn != InvalidXLogRecPtr)
     370         722 :         values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     371             :     else
     372         752 :         nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     373             : 
     374        1474 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     375             :                             replaces);
     376             : 
     377             :     /* Update the catalog. */
     378        1474 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     379             : 
     380             :     /* Cleanup. */
     381        1474 :     table_close(rel, NoLock);
     382        1474 : }
     383             : 
     384             : /*
     385             :  * Get state of subscription table.
     386             :  *
     387             :  * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
     388             :  */
     389             : char
     390        2404 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
     391             : {
     392             :     HeapTuple   tup;
     393             :     char        substate;
     394             :     bool        isnull;
     395             :     Datum       d;
     396             :     Relation    rel;
     397             : 
     398             :     /*
     399             :      * This is to avoid the race condition with AlterSubscription which tries
     400             :      * to remove this relstate.
     401             :      */
     402        2404 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     403             : 
     404             :     /* Try finding the mapping. */
     405        2404 :     tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
     406             :                           ObjectIdGetDatum(relid),
     407             :                           ObjectIdGetDatum(subid));
     408             : 
     409        2404 :     if (!HeapTupleIsValid(tup))
     410             :     {
     411          48 :         table_close(rel, AccessShareLock);
     412          48 :         *sublsn = InvalidXLogRecPtr;
     413          48 :         return SUBREL_STATE_UNKNOWN;
     414             :     }
     415             : 
     416             :     /* Get the state. */
     417        2356 :     substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
     418             : 
     419             :     /* Get the LSN */
     420        2356 :     d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     421             :                         Anum_pg_subscription_rel_srsublsn, &isnull);
     422        2356 :     if (isnull)
     423        1308 :         *sublsn = InvalidXLogRecPtr;
     424             :     else
     425        1048 :         *sublsn = DatumGetLSN(d);
     426             : 
     427             :     /* Cleanup */
     428        2356 :     ReleaseSysCache(tup);
     429             : 
     430        2356 :     table_close(rel, AccessShareLock);
     431             : 
     432        2356 :     return substate;
     433             : }
     434             : 
     435             : /*
     436             :  * Drop subscription relation mapping. These can be for a particular
     437             :  * subscription, or for a particular relation, or both.
     438             :  */
     439             : void
     440       48854 : RemoveSubscriptionRel(Oid subid, Oid relid)
     441             : {
     442             :     Relation    rel;
     443             :     TableScanDesc scan;
     444             :     ScanKeyData skey[2];
     445             :     HeapTuple   tup;
     446       48854 :     int         nkeys = 0;
     447             : 
     448       48854 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     449             : 
     450       48854 :     if (OidIsValid(subid))
     451             :     {
     452         272 :         ScanKeyInit(&skey[nkeys++],
     453             :                     Anum_pg_subscription_rel_srsubid,
     454             :                     BTEqualStrategyNumber,
     455             :                     F_OIDEQ,
     456             :                     ObjectIdGetDatum(subid));
     457             :     }
     458             : 
     459       48854 :     if (OidIsValid(relid))
     460             :     {
     461       48624 :         ScanKeyInit(&skey[nkeys++],
     462             :                     Anum_pg_subscription_rel_srrelid,
     463             :                     BTEqualStrategyNumber,
     464             :                     F_OIDEQ,
     465             :                     ObjectIdGetDatum(relid));
     466             :     }
     467             : 
     468             :     /* Do the search and delete what we found. */
     469       48854 :     scan = table_beginscan_catalog(rel, nkeys, skey);
     470       49084 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     471             :     {
     472             :         Form_pg_subscription_rel subrel;
     473             : 
     474         230 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     475             : 
     476             :         /*
     477             :          * We don't allow to drop the relation mapping when the table
     478             :          * synchronization is in progress unless the caller updates the
     479             :          * corresponding subscription as well. This is to ensure that we don't
     480             :          * leave tablesync slots or origins in the system when the
     481             :          * corresponding table is dropped.
     482             :          */
     483         230 :         if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY)
     484             :         {
     485           0 :             ereport(ERROR,
     486             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     487             :                      errmsg("could not drop relation mapping for subscription \"%s\"",
     488             :                             get_subscription_name(subrel->srsubid, false)),
     489             :                      errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
     490             :                                get_rel_name(relid), subrel->srsubstate),
     491             : 
     492             :             /*
     493             :              * translator: first %s is a SQL ALTER command and second %s is a
     494             :              * SQL DROP command
     495             :              */
     496             :                      errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
     497             :                              "ALTER SUBSCRIPTION ... ENABLE",
     498             :                              "DROP SUBSCRIPTION ...")));
     499             :         }
     500             : 
     501         230 :         CatalogTupleDelete(rel, &tup->t_self);
     502             :     }
     503       48854 :     table_endscan(scan);
     504             : 
     505       48854 :     table_close(rel, RowExclusiveLock);
     506       48854 : }
     507             : 
     508             : /*
     509             :  * Does the subscription have any relations?
     510             :  *
     511             :  * Use this function only to know true/false, and when you have no need for the
     512             :  * List returned by GetSubscriptionRelations.
     513             :  */
     514             : bool
     515         500 : HasSubscriptionRelations(Oid subid)
     516             : {
     517             :     Relation    rel;
     518             :     ScanKeyData skey[1];
     519             :     SysScanDesc scan;
     520             :     bool        has_subrels;
     521             : 
     522         500 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     523             : 
     524         500 :     ScanKeyInit(&skey[0],
     525             :                 Anum_pg_subscription_rel_srsubid,
     526             :                 BTEqualStrategyNumber, F_OIDEQ,
     527             :                 ObjectIdGetDatum(subid));
     528             : 
     529         500 :     scan = systable_beginscan(rel, InvalidOid, false,
     530             :                               NULL, 1, skey);
     531             : 
     532             :     /* If even a single tuple exists then the subscription has tables. */
     533         500 :     has_subrels = HeapTupleIsValid(systable_getnext(scan));
     534             : 
     535             :     /* Cleanup */
     536         500 :     systable_endscan(scan);
     537         500 :     table_close(rel, AccessShareLock);
     538             : 
     539         500 :     return has_subrels;
     540             : }
     541             : 
     542             : /*
     543             :  * Get the relations for the subscription.
     544             :  *
     545             :  * If not_ready is true, return only the relations that are not in a ready
     546             :  * state, otherwise return all the relations of the subscription.  The
     547             :  * returned list is palloc'ed in the current memory context.
     548             :  */
     549             : List *
     550        2052 : GetSubscriptionRelations(Oid subid, bool not_ready)
     551             : {
     552        2052 :     List       *res = NIL;
     553             :     Relation    rel;
     554             :     HeapTuple   tup;
     555        2052 :     int         nkeys = 0;
     556             :     ScanKeyData skey[2];
     557             :     SysScanDesc scan;
     558             : 
     559        2052 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     560             : 
     561        2052 :     ScanKeyInit(&skey[nkeys++],
     562             :                 Anum_pg_subscription_rel_srsubid,
     563             :                 BTEqualStrategyNumber, F_OIDEQ,
     564             :                 ObjectIdGetDatum(subid));
     565             : 
     566        2052 :     if (not_ready)
     567        1984 :         ScanKeyInit(&skey[nkeys++],
     568             :                     Anum_pg_subscription_rel_srsubstate,
     569             :                     BTEqualStrategyNumber, F_CHARNE,
     570             :                     CharGetDatum(SUBREL_STATE_READY));
     571             : 
     572        2052 :     scan = systable_beginscan(rel, InvalidOid, false,
     573             :                               NULL, nkeys, skey);
     574             : 
     575        5206 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     576             :     {
     577             :         Form_pg_subscription_rel subrel;
     578             :         SubscriptionRelState *relstate;
     579             :         Datum       d;
     580             :         bool        isnull;
     581             : 
     582        3154 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     583             : 
     584        3154 :         relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
     585        3154 :         relstate->relid = subrel->srrelid;
     586        3154 :         relstate->state = subrel->srsubstate;
     587        3154 :         d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     588             :                             Anum_pg_subscription_rel_srsublsn, &isnull);
     589        3154 :         if (isnull)
     590        2618 :             relstate->lsn = InvalidXLogRecPtr;
     591             :         else
     592         536 :             relstate->lsn = DatumGetLSN(d);
     593             : 
     594        3154 :         res = lappend(res, relstate);
     595             :     }
     596             : 
     597             :     /* Cleanup */
     598        2052 :     systable_endscan(scan);
     599        2052 :     table_close(rel, AccessShareLock);
     600             : 
     601        2052 :     return res;
     602             : }
     603             : 
     604             : /*
     605             :  * Update the dead tuple retention status for the given subscription.
     606             :  */
     607             : void
     608           4 : UpdateDeadTupleRetentionStatus(Oid subid, bool active)
     609             : {
     610             :     Relation    rel;
     611             :     bool        nulls[Natts_pg_subscription];
     612             :     bool        replaces[Natts_pg_subscription];
     613             :     Datum       values[Natts_pg_subscription];
     614             :     HeapTuple   tup;
     615             : 
     616             :     /* Look up the subscription in the catalog */
     617           4 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     618           4 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
     619             : 
     620           4 :     if (!HeapTupleIsValid(tup))
     621           0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
     622             : 
     623           4 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     624             : 
     625             :     /* Form a new tuple. */
     626           4 :     memset(values, 0, sizeof(values));
     627           4 :     memset(nulls, false, sizeof(nulls));
     628           4 :     memset(replaces, false, sizeof(replaces));
     629             : 
     630             :     /* Set the subscription to disabled. */
     631           4 :     values[Anum_pg_subscription_subretentionactive - 1] = active;
     632           4 :     replaces[Anum_pg_subscription_subretentionactive - 1] = true;
     633             : 
     634             :     /* Update the catalog */
     635           4 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     636             :                             replaces);
     637           4 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     638           4 :     heap_freetuple(tup);
     639             : 
     640           4 :     table_close(rel, NoLock);
     641           4 : }

Generated by: LCOV version 1.16