LCOV - code coverage report
Current view: top level - src/backend/catalog - pg_subscription.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 217 227 95.6 %
Date: 2025-10-31 11:17:53 Functions: 13 13 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_subscription.c
       4             :  *      replication subscriptions
       5             :  *
       6             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *      src/backend/catalog/pg_subscription.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/genam.h"
      18             : #include "access/heapam.h"
      19             : #include "access/htup_details.h"
      20             : #include "access/tableam.h"
      21             : #include "catalog/indexing.h"
      22             : #include "catalog/pg_subscription.h"
      23             : #include "catalog/pg_subscription_rel.h"
      24             : #include "catalog/pg_type.h"
      25             : #include "miscadmin.h"
      26             : #include "storage/lmgr.h"
      27             : #include "utils/array.h"
      28             : #include "utils/builtins.h"
      29             : #include "utils/fmgroids.h"
      30             : #include "utils/lsyscache.h"
      31             : #include "utils/pg_lsn.h"
      32             : #include "utils/rel.h"
      33             : #include "utils/syscache.h"
      34             : 
      35             : static List *textarray_to_stringlist(ArrayType *textarray);
      36             : 
      37             : /*
      38             :  * Add a comma-separated list of publication names to the 'dest' string.
      39             :  */
      40             : void
      41        1020 : GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
      42             : {
      43             :     ListCell   *lc;
      44        1020 :     bool        first = true;
      45             : 
      46             :     Assert(publications != NIL);
      47             : 
      48        2626 :     foreach(lc, publications)
      49             :     {
      50        1606 :         char       *pubname = strVal(lfirst(lc));
      51             : 
      52        1606 :         if (first)
      53        1020 :             first = false;
      54             :         else
      55         586 :             appendStringInfoString(dest, ", ");
      56             : 
      57        1606 :         if (quote_literal)
      58        1586 :             appendStringInfoString(dest, quote_literal_cstr(pubname));
      59             :         else
      60             :         {
      61          20 :             appendStringInfoChar(dest, '"');
      62          20 :             appendStringInfoString(dest, pubname);
      63          20 :             appendStringInfoChar(dest, '"');
      64             :         }
      65             :     }
      66        1020 : }
      67             : 
      68             : /*
      69             :  * Fetch the subscription from the syscache.
      70             :  */
      71             : Subscription *
      72        1708 : GetSubscription(Oid subid, bool missing_ok)
      73             : {
      74             :     HeapTuple   tup;
      75             :     Subscription *sub;
      76             :     Form_pg_subscription subform;
      77             :     Datum       datum;
      78             :     bool        isnull;
      79             : 
      80        1708 :     tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
      81             : 
      82        1708 :     if (!HeapTupleIsValid(tup))
      83             :     {
      84         122 :         if (missing_ok)
      85         122 :             return NULL;
      86             : 
      87           0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
      88             :     }
      89             : 
      90        1586 :     subform = (Form_pg_subscription) GETSTRUCT(tup);
      91             : 
      92        1586 :     sub = (Subscription *) palloc(sizeof(Subscription));
      93        1586 :     sub->oid = subid;
      94        1586 :     sub->dbid = subform->subdbid;
      95        1586 :     sub->skiplsn = subform->subskiplsn;
      96        1586 :     sub->name = pstrdup(NameStr(subform->subname));
      97        1586 :     sub->owner = subform->subowner;
      98        1586 :     sub->enabled = subform->subenabled;
      99        1586 :     sub->binary = subform->subbinary;
     100        1586 :     sub->stream = subform->substream;
     101        1586 :     sub->twophasestate = subform->subtwophasestate;
     102        1586 :     sub->disableonerr = subform->subdisableonerr;
     103        1586 :     sub->passwordrequired = subform->subpasswordrequired;
     104        1586 :     sub->runasowner = subform->subrunasowner;
     105        1586 :     sub->failover = subform->subfailover;
     106        1586 :     sub->retaindeadtuples = subform->subretaindeadtuples;
     107        1586 :     sub->maxretention = subform->submaxretention;
     108        1586 :     sub->retentionactive = subform->subretentionactive;
     109             : 
     110             :     /* Get conninfo */
     111        1586 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     112             :                                    tup,
     113             :                                    Anum_pg_subscription_subconninfo);
     114        1586 :     sub->conninfo = TextDatumGetCString(datum);
     115             : 
     116             :     /* Get slotname */
     117        1586 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID,
     118             :                             tup,
     119             :                             Anum_pg_subscription_subslotname,
     120             :                             &isnull);
     121        1586 :     if (!isnull)
     122        1520 :         sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
     123             :     else
     124          66 :         sub->slotname = NULL;
     125             : 
     126             :     /* Get synccommit */
     127        1586 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     128             :                                    tup,
     129             :                                    Anum_pg_subscription_subsynccommit);
     130        1586 :     sub->synccommit = TextDatumGetCString(datum);
     131             : 
     132             :     /* Get publications */
     133        1586 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     134             :                                    tup,
     135             :                                    Anum_pg_subscription_subpublications);
     136        1586 :     sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
     137             : 
     138             :     /* Get origin */
     139        1586 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     140             :                                    tup,
     141             :                                    Anum_pg_subscription_suborigin);
     142        1586 :     sub->origin = TextDatumGetCString(datum);
     143             : 
     144             :     /* Is the subscription owner a superuser? */
     145        1586 :     sub->ownersuperuser = superuser_arg(sub->owner);
     146             : 
     147        1586 :     ReleaseSysCache(tup);
     148             : 
     149        1586 :     return sub;
     150             : }
     151             : 
     152             : /*
     153             :  * Return number of subscriptions defined in given database.
     154             :  * Used by dropdb() to check if database can indeed be dropped.
     155             :  */
     156             : int
     157          90 : CountDBSubscriptions(Oid dbid)
     158             : {
     159          90 :     int         nsubs = 0;
     160             :     Relation    rel;
     161             :     ScanKeyData scankey;
     162             :     SysScanDesc scan;
     163             :     HeapTuple   tup;
     164             : 
     165          90 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     166             : 
     167          90 :     ScanKeyInit(&scankey,
     168             :                 Anum_pg_subscription_subdbid,
     169             :                 BTEqualStrategyNumber, F_OIDEQ,
     170             :                 ObjectIdGetDatum(dbid));
     171             : 
     172          90 :     scan = systable_beginscan(rel, InvalidOid, false,
     173             :                               NULL, 1, &scankey);
     174             : 
     175          90 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     176           0 :         nsubs++;
     177             : 
     178          90 :     systable_endscan(scan);
     179             : 
     180          90 :     table_close(rel, NoLock);
     181             : 
     182          90 :     return nsubs;
     183             : }
     184             : 
     185             : /*
     186             :  * Free memory allocated by subscription struct.
     187             :  */
     188             : void
     189          74 : FreeSubscription(Subscription *sub)
     190             : {
     191          74 :     pfree(sub->name);
     192          74 :     pfree(sub->conninfo);
     193          74 :     if (sub->slotname)
     194          74 :         pfree(sub->slotname);
     195          74 :     list_free_deep(sub->publications);
     196          74 :     pfree(sub);
     197          74 : }
     198             : 
     199             : /*
     200             :  * Disable the given subscription.
     201             :  */
     202             : void
     203           8 : DisableSubscription(Oid subid)
     204             : {
     205             :     Relation    rel;
     206             :     bool        nulls[Natts_pg_subscription];
     207             :     bool        replaces[Natts_pg_subscription];
     208             :     Datum       values[Natts_pg_subscription];
     209             :     HeapTuple   tup;
     210             : 
     211             :     /* Look up the subscription in the catalog */
     212           8 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     213           8 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
     214             : 
     215           8 :     if (!HeapTupleIsValid(tup))
     216           0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
     217             : 
     218           8 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     219             : 
     220             :     /* Form a new tuple. */
     221           8 :     memset(values, 0, sizeof(values));
     222           8 :     memset(nulls, false, sizeof(nulls));
     223           8 :     memset(replaces, false, sizeof(replaces));
     224             : 
     225             :     /* Set the subscription to disabled. */
     226           8 :     values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
     227           8 :     replaces[Anum_pg_subscription_subenabled - 1] = true;
     228             : 
     229             :     /* Update the catalog */
     230           8 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     231             :                             replaces);
     232           8 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     233           8 :     heap_freetuple(tup);
     234             : 
     235           8 :     table_close(rel, NoLock);
     236           8 : }
     237             : 
     238             : /*
     239             :  * Convert text array to list of strings.
     240             :  *
     241             :  * Note: the resulting list of strings is pallocated here.
     242             :  */
     243             : static List *
     244        1586 : textarray_to_stringlist(ArrayType *textarray)
     245             : {
     246             :     Datum      *elems;
     247             :     int         nelems,
     248             :                 i;
     249        1586 :     List       *res = NIL;
     250             : 
     251        1586 :     deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
     252             : 
     253        1586 :     if (nelems == 0)
     254           0 :         return NIL;
     255             : 
     256        3884 :     for (i = 0; i < nelems; i++)
     257        2298 :         res = lappend(res, makeString(TextDatumGetCString(elems[i])));
     258             : 
     259        1586 :     return res;
     260             : }
     261             : 
     262             : /*
     263             :  * Add new state record for a subscription table.
     264             :  *
     265             :  * If retain_lock is true, then don't release the locks taken in this function.
     266             :  * We normally release the locks at the end of transaction but in binary-upgrade
     267             :  * mode, we expect to release those immediately.
     268             :  */
     269             : void
     270         412 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
     271             :                         XLogRecPtr sublsn, bool retain_lock)
     272             : {
     273             :     Relation    rel;
     274             :     HeapTuple   tup;
     275             :     bool        nulls[Natts_pg_subscription_rel];
     276             :     Datum       values[Natts_pg_subscription_rel];
     277             : 
     278         412 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     279             : 
     280         412 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     281             : 
     282             :     /* Try finding existing mapping. */
     283         412 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     284             :                               ObjectIdGetDatum(relid),
     285             :                               ObjectIdGetDatum(subid));
     286         412 :     if (HeapTupleIsValid(tup))
     287           0 :         elog(ERROR, "subscription relation %u in subscription %u already exists",
     288             :              relid, subid);
     289             : 
     290             :     /* Form the tuple. */
     291         412 :     memset(values, 0, sizeof(values));
     292         412 :     memset(nulls, false, sizeof(nulls));
     293         412 :     values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
     294         412 :     values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
     295         412 :     values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     296         412 :     if (sublsn != InvalidXLogRecPtr)
     297           2 :         values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     298             :     else
     299         410 :         nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     300             : 
     301         412 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     302             : 
     303             :     /* Insert tuple into catalog. */
     304         412 :     CatalogTupleInsert(rel, tup);
     305             : 
     306         412 :     heap_freetuple(tup);
     307             : 
     308             :     /* Cleanup. */
     309         412 :     if (retain_lock)
     310             :     {
     311         408 :         table_close(rel, NoLock);
     312             :     }
     313             :     else
     314             :     {
     315           4 :         table_close(rel, RowExclusiveLock);
     316           4 :         UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     317             :     }
     318         412 : }
     319             : 
     320             : /*
     321             :  * Update the state of a subscription table.
     322             :  */
     323             : void
     324        1478 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
     325             :                            XLogRecPtr sublsn, bool already_locked)
     326             : {
     327             :     Relation    rel;
     328             :     HeapTuple   tup;
     329             :     bool        nulls[Natts_pg_subscription_rel];
     330             :     Datum       values[Natts_pg_subscription_rel];
     331             :     bool        replaces[Natts_pg_subscription_rel];
     332             : 
     333        1478 :     if (already_locked)
     334             :     {
     335             : #ifdef USE_ASSERT_CHECKING
     336             :         LOCKTAG     tag;
     337             : 
     338             :         Assert(CheckRelationOidLockedByMe(SubscriptionRelRelationId,
     339             :                                           RowExclusiveLock, true));
     340             :         SET_LOCKTAG_OBJECT(tag, InvalidOid, SubscriptionRelationId, subid, 0);
     341             :         Assert(LockHeldByMe(&tag, AccessShareLock, true));
     342             : #endif
     343             : 
     344         350 :         rel = table_open(SubscriptionRelRelationId, NoLock);
     345             :     }
     346             :     else
     347             :     {
     348        1128 :         LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     349        1128 :         rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     350             :     }
     351             : 
     352             :     /* Try finding existing mapping. */
     353        1478 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     354             :                               ObjectIdGetDatum(relid),
     355             :                               ObjectIdGetDatum(subid));
     356        1478 :     if (!HeapTupleIsValid(tup))
     357           0 :         elog(ERROR, "subscription table %u in subscription %u does not exist",
     358             :              relid, subid);
     359             : 
     360             :     /* Update the tuple. */
     361        1478 :     memset(values, 0, sizeof(values));
     362        1478 :     memset(nulls, false, sizeof(nulls));
     363        1478 :     memset(replaces, false, sizeof(replaces));
     364             : 
     365        1478 :     replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
     366        1478 :     values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     367             : 
     368        1478 :     replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
     369        1478 :     if (sublsn != InvalidXLogRecPtr)
     370         718 :         values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     371             :     else
     372         760 :         nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     373             : 
     374        1478 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     375             :                             replaces);
     376             : 
     377             :     /* Update the catalog. */
     378        1478 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     379             : 
     380             :     /* Cleanup. */
     381        1478 :     table_close(rel, NoLock);
     382        1478 : }
     383             : 
     384             : /*
     385             :  * Get state of subscription table.
     386             :  *
     387             :  * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
     388             :  */
     389             : char
     390        2406 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
     391             : {
     392             :     HeapTuple   tup;
     393             :     char        substate;
     394             :     bool        isnull;
     395             :     Datum       d;
     396             :     Relation    rel;
     397             : 
     398             :     /*
     399             :      * This is to avoid the race condition with AlterSubscription which tries
     400             :      * to remove this relstate.
     401             :      */
     402        2406 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     403             : 
     404             :     /* Try finding the mapping. */
     405        2406 :     tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
     406             :                           ObjectIdGetDatum(relid),
     407             :                           ObjectIdGetDatum(subid));
     408             : 
     409        2406 :     if (!HeapTupleIsValid(tup))
     410             :     {
     411          50 :         table_close(rel, AccessShareLock);
     412          50 :         *sublsn = InvalidXLogRecPtr;
     413          50 :         return SUBREL_STATE_UNKNOWN;
     414             :     }
     415             : 
     416             :     /* Get the state. */
     417        2356 :     substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
     418             : 
     419             :     /* Get the LSN */
     420        2356 :     d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     421             :                         Anum_pg_subscription_rel_srsublsn, &isnull);
     422        2356 :     if (isnull)
     423        1296 :         *sublsn = InvalidXLogRecPtr;
     424             :     else
     425        1060 :         *sublsn = DatumGetLSN(d);
     426             : 
     427             :     /* Cleanup */
     428        2356 :     ReleaseSysCache(tup);
     429             : 
     430        2356 :     table_close(rel, AccessShareLock);
     431             : 
     432        2356 :     return substate;
     433             : }
     434             : 
     435             : /*
     436             :  * Drop subscription relation mapping. These can be for a particular
     437             :  * subscription, or for a particular relation, or both.
     438             :  */
     439             : void
     440       49198 : RemoveSubscriptionRel(Oid subid, Oid relid)
     441             : {
     442             :     Relation    rel;
     443             :     TableScanDesc scan;
     444             :     ScanKeyData skey[2];
     445             :     HeapTuple   tup;
     446       49198 :     int         nkeys = 0;
     447             : 
     448       49198 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     449             : 
     450       49198 :     if (OidIsValid(subid))
     451             :     {
     452         272 :         ScanKeyInit(&skey[nkeys++],
     453             :                     Anum_pg_subscription_rel_srsubid,
     454             :                     BTEqualStrategyNumber,
     455             :                     F_OIDEQ,
     456             :                     ObjectIdGetDatum(subid));
     457             :     }
     458             : 
     459       49198 :     if (OidIsValid(relid))
     460             :     {
     461       48968 :         ScanKeyInit(&skey[nkeys++],
     462             :                     Anum_pg_subscription_rel_srrelid,
     463             :                     BTEqualStrategyNumber,
     464             :                     F_OIDEQ,
     465             :                     ObjectIdGetDatum(relid));
     466             :     }
     467             : 
     468             :     /* Do the search and delete what we found. */
     469       49198 :     scan = table_beginscan_catalog(rel, nkeys, skey);
     470       49428 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     471             :     {
     472             :         Form_pg_subscription_rel subrel;
     473             : 
     474         230 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     475             : 
     476             :         /*
     477             :          * We don't allow to drop the relation mapping when the table
     478             :          * synchronization is in progress unless the caller updates the
     479             :          * corresponding subscription as well. This is to ensure that we don't
     480             :          * leave tablesync slots or origins in the system when the
     481             :          * corresponding table is dropped. For sequences, however, it's ok to
     482             :          * drop them since no separate slots or origins are created during
     483             :          * synchronization.
     484             :          */
     485         230 :         if (!OidIsValid(subid) &&
     486          32 :             subrel->srsubstate != SUBREL_STATE_READY &&
     487           0 :             get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE)
     488             :         {
     489           0 :             ereport(ERROR,
     490             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     491             :                      errmsg("could not drop relation mapping for subscription \"%s\"",
     492             :                             get_subscription_name(subrel->srsubid, false)),
     493             :                      errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
     494             :                                get_rel_name(relid), subrel->srsubstate),
     495             : 
     496             :             /*
     497             :              * translator: first %s is a SQL ALTER command and second %s is a
     498             :              * SQL DROP command
     499             :              */
     500             :                      errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
     501             :                              "ALTER SUBSCRIPTION ... ENABLE",
     502             :                              "DROP SUBSCRIPTION ...")));
     503             :         }
     504             : 
     505         230 :         CatalogTupleDelete(rel, &tup->t_self);
     506             :     }
     507       49198 :     table_endscan(scan);
     508             : 
     509       49198 :     table_close(rel, RowExclusiveLock);
     510       49198 : }
     511             : 
     512             : /*
     513             :  * Does the subscription have any tables?
     514             :  *
     515             :  * Use this function only to know true/false, and when you have no need for the
     516             :  * List returned by GetSubscriptionRelations.
     517             :  */
     518             : bool
     519         490 : HasSubscriptionTables(Oid subid)
     520             : {
     521             :     Relation    rel;
     522             :     ScanKeyData skey[1];
     523             :     SysScanDesc scan;
     524             :     HeapTuple   tup;
     525         490 :     bool        has_subtables = false;
     526             : 
     527         490 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     528             : 
     529         490 :     ScanKeyInit(&skey[0],
     530             :                 Anum_pg_subscription_rel_srsubid,
     531             :                 BTEqualStrategyNumber, F_OIDEQ,
     532             :                 ObjectIdGetDatum(subid));
     533             : 
     534         490 :     scan = systable_beginscan(rel, InvalidOid, false,
     535             :                               NULL, 1, skey);
     536             : 
     537         492 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     538             :     {
     539             :         Form_pg_subscription_rel subrel;
     540             :         char        relkind;
     541             : 
     542         478 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     543         478 :         relkind = get_rel_relkind(subrel->srrelid);
     544             : 
     545         478 :         if (relkind == RELKIND_RELATION ||
     546             :             relkind == RELKIND_PARTITIONED_TABLE)
     547             :         {
     548         476 :             has_subtables = true;
     549         476 :             break;
     550             :         }
     551             :     }
     552             : 
     553             :     /* Cleanup */
     554         490 :     systable_endscan(scan);
     555         490 :     table_close(rel, AccessShareLock);
     556             : 
     557         490 :     return has_subtables;
     558             : }
     559             : 
     560             : /*
     561             :  * Get the relations for the subscription.
     562             :  *
     563             :  * If not_ready is true, return only the relations that are not in a ready
     564             :  * state, otherwise return all the relations of the subscription.  The
     565             :  * returned list is palloc'ed in the current memory context.
     566             :  */
     567             : List *
     568        2012 : GetSubscriptionRelations(Oid subid, bool tables, bool sequences,
     569             :                          bool not_ready)
     570             : {
     571        2012 :     List       *res = NIL;
     572             :     Relation    rel;
     573             :     HeapTuple   tup;
     574        2012 :     int         nkeys = 0;
     575             :     ScanKeyData skey[2];
     576             :     SysScanDesc scan;
     577             : 
     578             :     /* One or both of 'tables' and 'sequences' must be true. */
     579             :     Assert(tables || sequences);
     580             : 
     581        2012 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     582             : 
     583        2012 :     ScanKeyInit(&skey[nkeys++],
     584             :                 Anum_pg_subscription_rel_srsubid,
     585             :                 BTEqualStrategyNumber, F_OIDEQ,
     586             :                 ObjectIdGetDatum(subid));
     587             : 
     588        2012 :     if (not_ready)
     589        1944 :         ScanKeyInit(&skey[nkeys++],
     590             :                     Anum_pg_subscription_rel_srsubstate,
     591             :                     BTEqualStrategyNumber, F_CHARNE,
     592             :                     CharGetDatum(SUBREL_STATE_READY));
     593             : 
     594        2012 :     scan = systable_beginscan(rel, InvalidOid, false,
     595             :                               NULL, nkeys, skey);
     596             : 
     597        5080 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     598             :     {
     599             :         Form_pg_subscription_rel subrel;
     600             :         SubscriptionRelState *relstate;
     601             :         Datum       d;
     602             :         bool        isnull;
     603             :         char        relkind;
     604             : 
     605        3068 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     606             : 
     607             :         /* Relation is either a sequence or a table */
     608        3068 :         relkind = get_rel_relkind(subrel->srrelid);
     609             :         Assert(relkind == RELKIND_SEQUENCE || relkind == RELKIND_RELATION ||
     610             :                relkind == RELKIND_PARTITIONED_TABLE);
     611             : 
     612             :         /* Skip sequences if they were not requested */
     613        3068 :         if ((relkind == RELKIND_SEQUENCE) && !sequences)
     614           2 :             continue;
     615             : 
     616             :         /* Skip tables if they were not requested */
     617        3066 :         if ((relkind == RELKIND_RELATION ||
     618        3066 :              relkind == RELKIND_PARTITIONED_TABLE) && !tables)
     619           0 :             continue;
     620             : 
     621        3066 :         relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
     622        3066 :         relstate->relid = subrel->srrelid;
     623        3066 :         relstate->state = subrel->srsubstate;
     624        3066 :         d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     625             :                             Anum_pg_subscription_rel_srsublsn, &isnull);
     626        3066 :         if (isnull)
     627        2536 :             relstate->lsn = InvalidXLogRecPtr;
     628             :         else
     629         530 :             relstate->lsn = DatumGetLSN(d);
     630             : 
     631        3066 :         res = lappend(res, relstate);
     632             :     }
     633             : 
     634             :     /* Cleanup */
     635        2012 :     systable_endscan(scan);
     636        2012 :     table_close(rel, AccessShareLock);
     637             : 
     638        2012 :     return res;
     639             : }
     640             : 
     641             : /*
     642             :  * Update the dead tuple retention status for the given subscription.
     643             :  */
     644             : void
     645           4 : UpdateDeadTupleRetentionStatus(Oid subid, bool active)
     646             : {
     647             :     Relation    rel;
     648             :     bool        nulls[Natts_pg_subscription];
     649             :     bool        replaces[Natts_pg_subscription];
     650             :     Datum       values[Natts_pg_subscription];
     651             :     HeapTuple   tup;
     652             : 
     653             :     /* Look up the subscription in the catalog */
     654           4 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     655           4 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
     656             : 
     657           4 :     if (!HeapTupleIsValid(tup))
     658           0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
     659             : 
     660           4 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     661             : 
     662             :     /* Form a new tuple. */
     663           4 :     memset(values, 0, sizeof(values));
     664           4 :     memset(nulls, false, sizeof(nulls));
     665           4 :     memset(replaces, false, sizeof(replaces));
     666             : 
     667             :     /* Set the subscription to disabled. */
     668           4 :     values[Anum_pg_subscription_subretentionactive - 1] = active;
     669           4 :     replaces[Anum_pg_subscription_subretentionactive - 1] = true;
     670             : 
     671             :     /* Update the catalog */
     672           4 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     673             :                             replaces);
     674           4 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     675           4 :     heap_freetuple(tup);
     676             : 
     677           4 :     table_close(rel, NoLock);
     678           4 : }

Generated by: LCOV version 1.16