LCOV - code coverage report
Current view: top level - src/backend/catalog - pg_subscription.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 95.2 % 229 218
Test Date: 2026-03-04 20:14:49 Functions: 100.0 % 13 13
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * pg_subscription.c
       4              :  *      replication subscriptions
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  * IDENTIFICATION
      10              :  *      src/backend/catalog/pg_subscription.c
      11              :  *
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : 
      15              : #include "postgres.h"
      16              : 
      17              : #include "access/genam.h"
      18              : #include "access/heapam.h"
      19              : #include "access/htup_details.h"
      20              : #include "access/tableam.h"
      21              : #include "catalog/indexing.h"
      22              : #include "catalog/pg_subscription.h"
      23              : #include "catalog/pg_subscription_rel.h"
      24              : #include "catalog/pg_type.h"
      25              : #include "miscadmin.h"
      26              : #include "storage/lmgr.h"
      27              : #include "utils/array.h"
      28              : #include "utils/builtins.h"
      29              : #include "utils/fmgroids.h"
      30              : #include "utils/lsyscache.h"
      31              : #include "utils/pg_lsn.h"
      32              : #include "utils/rel.h"
      33              : #include "utils/syscache.h"
      34              : 
      35              : static List *textarray_to_stringlist(ArrayType *textarray);
      36              : 
      37              : /*
      38              :  * Add a comma-separated list of publication names to the 'dest' string.
      39              :  */
      40              : void
      41          534 : GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
      42              : {
      43              :     ListCell   *lc;
      44          534 :     bool        first = true;
      45              : 
      46              :     Assert(publications != NIL);
      47              : 
      48         1376 :     foreach(lc, publications)
      49              :     {
      50          842 :         char       *pubname = strVal(lfirst(lc));
      51              : 
      52          842 :         if (first)
      53          534 :             first = false;
      54              :         else
      55          308 :             appendStringInfoString(dest, ", ");
      56              : 
      57          842 :         if (quote_literal)
      58          832 :             appendStringInfoString(dest, quote_literal_cstr(pubname));
      59              :         else
      60              :         {
      61           10 :             appendStringInfoChar(dest, '"');
      62           10 :             appendStringInfoString(dest, pubname);
      63           10 :             appendStringInfoChar(dest, '"');
      64              :         }
      65              :     }
      66          534 : }
      67              : 
      68              : /*
      69              :  * Fetch the subscription from the syscache.
      70              :  */
      71              : Subscription *
      72          924 : GetSubscription(Oid subid, bool missing_ok)
      73              : {
      74              :     HeapTuple   tup;
      75              :     Subscription *sub;
      76              :     Form_pg_subscription subform;
      77              :     Datum       datum;
      78              :     bool        isnull;
      79              : 
      80          924 :     tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
      81              : 
      82          924 :     if (!HeapTupleIsValid(tup))
      83              :     {
      84           66 :         if (missing_ok)
      85           66 :             return NULL;
      86              : 
      87            0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
      88              :     }
      89              : 
      90          858 :     subform = (Form_pg_subscription) GETSTRUCT(tup);
      91              : 
      92          858 :     sub = palloc_object(Subscription);
      93          858 :     sub->oid = subid;
      94          858 :     sub->dbid = subform->subdbid;
      95          858 :     sub->skiplsn = subform->subskiplsn;
      96          858 :     sub->name = pstrdup(NameStr(subform->subname));
      97          858 :     sub->owner = subform->subowner;
      98          858 :     sub->enabled = subform->subenabled;
      99          858 :     sub->binary = subform->subbinary;
     100          858 :     sub->stream = subform->substream;
     101          858 :     sub->twophasestate = subform->subtwophasestate;
     102          858 :     sub->disableonerr = subform->subdisableonerr;
     103          858 :     sub->passwordrequired = subform->subpasswordrequired;
     104          858 :     sub->runasowner = subform->subrunasowner;
     105          858 :     sub->failover = subform->subfailover;
     106          858 :     sub->retaindeadtuples = subform->subretaindeadtuples;
     107          858 :     sub->maxretention = subform->submaxretention;
     108          858 :     sub->retentionactive = subform->subretentionactive;
     109              : 
     110              :     /* Get conninfo */
     111          858 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     112              :                                    tup,
     113              :                                    Anum_pg_subscription_subconninfo);
     114          858 :     sub->conninfo = TextDatumGetCString(datum);
     115              : 
     116              :     /* Get slotname */
     117          858 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID,
     118              :                             tup,
     119              :                             Anum_pg_subscription_subslotname,
     120              :                             &isnull);
     121          858 :     if (!isnull)
     122          825 :         sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
     123              :     else
     124           33 :         sub->slotname = NULL;
     125              : 
     126              :     /* Get synccommit */
     127          858 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     128              :                                    tup,
     129              :                                    Anum_pg_subscription_subsynccommit);
     130          858 :     sub->synccommit = TextDatumGetCString(datum);
     131              : 
     132              :     /* Get walrcvtimeout */
     133          858 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     134              :                                    tup,
     135              :                                    Anum_pg_subscription_subwalrcvtimeout);
     136          858 :     sub->walrcvtimeout = TextDatumGetCString(datum);
     137              : 
     138              :     /* Get publications */
     139          858 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     140              :                                    tup,
     141              :                                    Anum_pg_subscription_subpublications);
     142          858 :     sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
     143              : 
     144              :     /* Get origin */
     145          858 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     146              :                                    tup,
     147              :                                    Anum_pg_subscription_suborigin);
     148          858 :     sub->origin = TextDatumGetCString(datum);
     149              : 
     150              :     /* Is the subscription owner a superuser? */
     151          858 :     sub->ownersuperuser = superuser_arg(sub->owner);
     152              : 
     153          858 :     ReleaseSysCache(tup);
     154              : 
     155          858 :     return sub;
     156              : }
     157              : 
     158              : /*
     159              :  * Return number of subscriptions defined in given database.
     160              :  * Used by dropdb() to check if database can indeed be dropped.
     161              :  */
     162              : int
     163           47 : CountDBSubscriptions(Oid dbid)
     164              : {
     165           47 :     int         nsubs = 0;
     166              :     Relation    rel;
     167              :     ScanKeyData scankey;
     168              :     SysScanDesc scan;
     169              :     HeapTuple   tup;
     170              : 
     171           47 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     172              : 
     173           47 :     ScanKeyInit(&scankey,
     174              :                 Anum_pg_subscription_subdbid,
     175              :                 BTEqualStrategyNumber, F_OIDEQ,
     176              :                 ObjectIdGetDatum(dbid));
     177              : 
     178           47 :     scan = systable_beginscan(rel, InvalidOid, false,
     179              :                               NULL, 1, &scankey);
     180              : 
     181           47 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     182            0 :         nsubs++;
     183              : 
     184           47 :     systable_endscan(scan);
     185              : 
     186           47 :     table_close(rel, NoLock);
     187              : 
     188           47 :     return nsubs;
     189              : }
     190              : 
     191              : /*
     192              :  * Free memory allocated by subscription struct.
     193              :  */
     194              : void
     195           31 : FreeSubscription(Subscription *sub)
     196              : {
     197           31 :     pfree(sub->name);
     198           31 :     pfree(sub->conninfo);
     199           31 :     if (sub->slotname)
     200           31 :         pfree(sub->slotname);
     201           31 :     list_free_deep(sub->publications);
     202           31 :     pfree(sub);
     203           31 : }
     204              : 
     205              : /*
     206              :  * Disable the given subscription.
     207              :  */
     208              : void
     209            4 : DisableSubscription(Oid subid)
     210              : {
     211              :     Relation    rel;
     212              :     bool        nulls[Natts_pg_subscription];
     213              :     bool        replaces[Natts_pg_subscription];
     214              :     Datum       values[Natts_pg_subscription];
     215              :     HeapTuple   tup;
     216              : 
     217              :     /* Look up the subscription in the catalog */
     218            4 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     219            4 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
     220              : 
     221            4 :     if (!HeapTupleIsValid(tup))
     222            0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
     223              : 
     224            4 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     225              : 
     226              :     /* Form a new tuple. */
     227            4 :     memset(values, 0, sizeof(values));
     228            4 :     memset(nulls, false, sizeof(nulls));
     229            4 :     memset(replaces, false, sizeof(replaces));
     230              : 
     231              :     /* Set the subscription to disabled. */
     232            4 :     values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
     233            4 :     replaces[Anum_pg_subscription_subenabled - 1] = true;
     234              : 
     235              :     /* Update the catalog */
     236            4 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     237              :                             replaces);
     238            4 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     239            4 :     heap_freetuple(tup);
     240              : 
     241            4 :     table_close(rel, NoLock);
     242            4 : }
     243              : 
     244              : /*
     245              :  * Convert text array to list of strings.
     246              :  *
     247              :  * Note: the resulting list of strings is pallocated here.
     248              :  */
     249              : static List *
     250          858 : textarray_to_stringlist(ArrayType *textarray)
     251              : {
     252              :     Datum      *elems;
     253              :     int         nelems,
     254              :                 i;
     255          858 :     List       *res = NIL;
     256              : 
     257          858 :     deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
     258              : 
     259          858 :     if (nelems == 0)
     260            0 :         return NIL;
     261              : 
     262         2110 :     for (i = 0; i < nelems; i++)
     263         1252 :         res = lappend(res, makeString(TextDatumGetCString(elems[i])));
     264              : 
     265          858 :     return res;
     266              : }
     267              : 
     268              : /*
     269              :  * Add new state record for a subscription table.
     270              :  *
     271              :  * If retain_lock is true, then don't release the locks taken in this function.
     272              :  * We normally release the locks at the end of transaction but in binary-upgrade
     273              :  * mode, we expect to release those immediately.
     274              :  */
     275              : void
     276          222 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
     277              :                         XLogRecPtr sublsn, bool retain_lock)
     278              : {
     279              :     Relation    rel;
     280              :     HeapTuple   tup;
     281              :     bool        nulls[Natts_pg_subscription_rel];
     282              :     Datum       values[Natts_pg_subscription_rel];
     283              : 
     284          222 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     285              : 
     286          222 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     287              : 
     288              :     /* Try finding existing mapping. */
     289          222 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     290              :                               ObjectIdGetDatum(relid),
     291              :                               ObjectIdGetDatum(subid));
     292          222 :     if (HeapTupleIsValid(tup))
     293            0 :         elog(ERROR, "subscription relation %u in subscription %u already exists",
     294              :              relid, subid);
     295              : 
     296              :     /* Form the tuple. */
     297          222 :     memset(values, 0, sizeof(values));
     298          222 :     memset(nulls, false, sizeof(nulls));
     299          222 :     values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
     300          222 :     values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
     301          222 :     values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     302          222 :     if (XLogRecPtrIsValid(sublsn))
     303            2 :         values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     304              :     else
     305          220 :         nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     306              : 
     307          222 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     308              : 
     309              :     /* Insert tuple into catalog. */
     310          222 :     CatalogTupleInsert(rel, tup);
     311              : 
     312          222 :     heap_freetuple(tup);
     313              : 
     314              :     /* Cleanup. */
     315          222 :     if (retain_lock)
     316              :     {
     317          219 :         table_close(rel, NoLock);
     318              :     }
     319              :     else
     320              :     {
     321            3 :         table_close(rel, RowExclusiveLock);
     322            3 :         UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     323              :     }
     324          222 : }
     325              : 
     326              : /*
     327              :  * Update the state of a subscription table.
     328              :  */
     329              : void
     330          786 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
     331              :                            XLogRecPtr sublsn, bool already_locked)
     332              : {
     333              :     Relation    rel;
     334              :     HeapTuple   tup;
     335              :     bool        nulls[Natts_pg_subscription_rel];
     336              :     Datum       values[Natts_pg_subscription_rel];
     337              :     bool        replaces[Natts_pg_subscription_rel];
     338              : 
     339          786 :     if (already_locked)
     340              :     {
     341              : #ifdef USE_ASSERT_CHECKING
     342              :         LOCKTAG     tag;
     343              : 
     344              :         Assert(CheckRelationOidLockedByMe(SubscriptionRelRelationId,
     345              :                                           RowExclusiveLock, true));
     346              :         SET_LOCKTAG_OBJECT(tag, InvalidOid, SubscriptionRelationId, subid, 0);
     347              :         Assert(LockHeldByMe(&tag, AccessShareLock, true));
     348              : #endif
     349              : 
     350          184 :         rel = table_open(SubscriptionRelRelationId, NoLock);
     351              :     }
     352              :     else
     353              :     {
     354          602 :         LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     355          602 :         rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     356              :     }
     357              : 
     358              :     /* Try finding existing mapping. */
     359          786 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     360              :                               ObjectIdGetDatum(relid),
     361              :                               ObjectIdGetDatum(subid));
     362          786 :     if (!HeapTupleIsValid(tup))
     363            0 :         elog(ERROR, "subscription relation %u in subscription %u does not exist",
     364              :              relid, subid);
     365              : 
     366              :     /* Update the tuple. */
     367          786 :     memset(values, 0, sizeof(values));
     368          786 :     memset(nulls, false, sizeof(nulls));
     369          786 :     memset(replaces, false, sizeof(replaces));
     370              : 
     371          786 :     replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
     372          786 :     values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     373              : 
     374          786 :     replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
     375          786 :     if (XLogRecPtrIsValid(sublsn))
     376          384 :         values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     377              :     else
     378          402 :         nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     379              : 
     380          786 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     381              :                             replaces);
     382              : 
     383              :     /* Update the catalog. */
     384          786 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     385              : 
     386              :     /* Cleanup. */
     387          786 :     table_close(rel, NoLock);
     388          786 : }
     389              : 
     390              : /*
     391              :  * Get state of subscription table.
     392              :  *
     393              :  * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
     394              :  */
     395              : char
     396         4247 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
     397              : {
     398              :     HeapTuple   tup;
     399              :     char        substate;
     400              :     bool        isnull;
     401              :     Datum       d;
     402              :     Relation    rel;
     403              : 
     404              :     /*
     405              :      * This is to avoid the race condition with AlterSubscription which tries
     406              :      * to remove this relstate.
     407              :      */
     408         4247 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     409              : 
     410              :     /* Try finding the mapping. */
     411         4247 :     tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
     412              :                           ObjectIdGetDatum(relid),
     413              :                           ObjectIdGetDatum(subid));
     414              : 
     415         4247 :     if (!HeapTupleIsValid(tup))
     416              :     {
     417           25 :         table_close(rel, AccessShareLock);
     418           25 :         *sublsn = InvalidXLogRecPtr;
     419           25 :         return SUBREL_STATE_UNKNOWN;
     420              :     }
     421              : 
     422              :     /* Get the state. */
     423         4222 :     substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
     424              : 
     425              :     /* Get the LSN */
     426         4222 :     d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     427              :                         Anum_pg_subscription_rel_srsublsn, &isnull);
     428         4222 :     if (isnull)
     429         3649 :         *sublsn = InvalidXLogRecPtr;
     430              :     else
     431          573 :         *sublsn = DatumGetLSN(d);
     432              : 
     433              :     /* Cleanup */
     434         4222 :     ReleaseSysCache(tup);
     435              : 
     436         4222 :     table_close(rel, AccessShareLock);
     437              : 
     438         4222 :     return substate;
     439              : }
     440              : 
     441              : /*
     442              :  * Drop subscription relation mapping. These can be for a particular
     443              :  * subscription, or for a particular relation, or both.
     444              :  */
     445              : void
     446        26130 : RemoveSubscriptionRel(Oid subid, Oid relid)
     447              : {
     448              :     Relation    rel;
     449              :     TableScanDesc scan;
     450              :     ScanKeyData skey[2];
     451              :     HeapTuple   tup;
     452        26130 :     int         nkeys = 0;
     453              : 
     454        26130 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     455              : 
     456        26130 :     if (OidIsValid(subid))
     457              :     {
     458          142 :         ScanKeyInit(&skey[nkeys++],
     459              :                     Anum_pg_subscription_rel_srsubid,
     460              :                     BTEqualStrategyNumber,
     461              :                     F_OIDEQ,
     462              :                     ObjectIdGetDatum(subid));
     463              :     }
     464              : 
     465        26130 :     if (OidIsValid(relid))
     466              :     {
     467        26009 :         ScanKeyInit(&skey[nkeys++],
     468              :                     Anum_pg_subscription_rel_srrelid,
     469              :                     BTEqualStrategyNumber,
     470              :                     F_OIDEQ,
     471              :                     ObjectIdGetDatum(relid));
     472              :     }
     473              : 
     474              :     /* Do the search and delete what we found. */
     475        26130 :     scan = table_beginscan_catalog(rel, nkeys, skey);
     476        26256 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     477              :     {
     478              :         Form_pg_subscription_rel subrel;
     479              : 
     480          126 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     481              : 
     482              :         /*
     483              :          * We don't allow to drop the relation mapping when the table
     484              :          * synchronization is in progress unless the caller updates the
     485              :          * corresponding subscription as well. This is to ensure that we don't
     486              :          * leave tablesync slots or origins in the system when the
     487              :          * corresponding table is dropped. For sequences, however, it's ok to
     488              :          * drop them since no separate slots or origins are created during
     489              :          * synchronization.
     490              :          */
     491          126 :         if (!OidIsValid(subid) &&
     492           16 :             subrel->srsubstate != SUBREL_STATE_READY &&
     493            0 :             get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE)
     494              :         {
     495            0 :             ereport(ERROR,
     496              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     497              :                      errmsg("could not drop relation mapping for subscription \"%s\"",
     498              :                             get_subscription_name(subrel->srsubid, false)),
     499              :                      errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
     500              :                                get_rel_name(relid), subrel->srsubstate),
     501              : 
     502              :             /*
     503              :              * translator: first %s is a SQL ALTER command and second %s is a
     504              :              * SQL DROP command
     505              :              */
     506              :                      errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
     507              :                              "ALTER SUBSCRIPTION ... ENABLE",
     508              :                              "DROP SUBSCRIPTION ...")));
     509              :         }
     510              : 
     511          126 :         CatalogTupleDelete(rel, &tup->t_self);
     512              :     }
     513        26130 :     table_endscan(scan);
     514              : 
     515        26130 :     table_close(rel, RowExclusiveLock);
     516        26130 : }
     517              : 
     518              : /*
     519              :  * Does the subscription have any tables?
     520              :  *
     521              :  * Use this function only to know true/false, and when you have no need for the
     522              :  * List returned by GetSubscriptionRelations.
     523              :  */
     524              : bool
     525          274 : HasSubscriptionTables(Oid subid)
     526              : {
     527              :     Relation    rel;
     528              :     ScanKeyData skey[1];
     529              :     SysScanDesc scan;
     530              :     HeapTuple   tup;
     531          274 :     bool        has_subtables = false;
     532              : 
     533          274 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     534              : 
     535          274 :     ScanKeyInit(&skey[0],
     536              :                 Anum_pg_subscription_rel_srsubid,
     537              :                 BTEqualStrategyNumber, F_OIDEQ,
     538              :                 ObjectIdGetDatum(subid));
     539              : 
     540          274 :     scan = systable_beginscan(rel, InvalidOid, false,
     541              :                               NULL, 1, skey);
     542              : 
     543          311 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     544              :     {
     545              :         Form_pg_subscription_rel subrel;
     546              :         char        relkind;
     547              : 
     548          296 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     549          296 :         relkind = get_rel_relkind(subrel->srrelid);
     550              : 
     551          296 :         if (relkind == RELKIND_RELATION ||
     552              :             relkind == RELKIND_PARTITIONED_TABLE)
     553              :         {
     554          259 :             has_subtables = true;
     555          259 :             break;
     556              :         }
     557              :     }
     558              : 
     559              :     /* Cleanup */
     560          274 :     systable_endscan(scan);
     561          274 :     table_close(rel, AccessShareLock);
     562              : 
     563          274 :     return has_subtables;
     564              : }
     565              : 
     566              : /*
     567              :  * Get the relations for the subscription.
     568              :  *
     569              :  * If not_ready is true, return only the relations that are not in a ready
     570              :  * state, otherwise return all the relations of the subscription.  The
     571              :  * returned list is palloc'ed in the current memory context.
     572              :  */
     573              : List *
     574         1121 : GetSubscriptionRelations(Oid subid, bool tables, bool sequences,
     575              :                          bool not_ready)
     576              : {
     577         1121 :     List       *res = NIL;
     578              :     Relation    rel;
     579              :     HeapTuple   tup;
     580         1121 :     int         nkeys = 0;
     581              :     ScanKeyData skey[2];
     582              :     SysScanDesc scan;
     583              : 
     584              :     /* One or both of 'tables' and 'sequences' must be true. */
     585              :     Assert(tables || sequences);
     586              : 
     587         1121 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     588              : 
     589         1121 :     ScanKeyInit(&skey[nkeys++],
     590              :                 Anum_pg_subscription_rel_srsubid,
     591              :                 BTEqualStrategyNumber, F_OIDEQ,
     592              :                 ObjectIdGetDatum(subid));
     593              : 
     594         1121 :     if (not_ready)
     595         1083 :         ScanKeyInit(&skey[nkeys++],
     596              :                     Anum_pg_subscription_rel_srsubstate,
     597              :                     BTEqualStrategyNumber, F_CHARNE,
     598              :                     CharGetDatum(SUBREL_STATE_READY));
     599              : 
     600         1121 :     scan = systable_beginscan(rel, InvalidOid, false,
     601              :                               NULL, nkeys, skey);
     602              : 
     603         2945 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     604              :     {
     605              :         Form_pg_subscription_rel subrel;
     606              :         SubscriptionRelState *relstate;
     607              :         Datum       d;
     608              :         bool        isnull;
     609              :         char        relkind;
     610              : 
     611         1824 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     612              : 
     613              :         /* Relation is either a sequence or a table */
     614         1824 :         relkind = get_rel_relkind(subrel->srrelid);
     615              :         Assert(relkind == RELKIND_SEQUENCE || relkind == RELKIND_RELATION ||
     616              :                relkind == RELKIND_PARTITIONED_TABLE);
     617              : 
     618              :         /* Skip sequences if they were not requested */
     619         1824 :         if ((relkind == RELKIND_SEQUENCE) && !sequences)
     620            0 :             continue;
     621              : 
     622              :         /* Skip tables if they were not requested */
     623         1824 :         if ((relkind == RELKIND_RELATION ||
     624         1797 :              relkind == RELKIND_PARTITIONED_TABLE) && !tables)
     625            0 :             continue;
     626              : 
     627         1824 :         relstate = palloc_object(SubscriptionRelState);
     628         1824 :         relstate->relid = subrel->srrelid;
     629         1824 :         relstate->state = subrel->srsubstate;
     630         1824 :         d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     631              :                             Anum_pg_subscription_rel_srsublsn, &isnull);
     632         1824 :         if (isnull)
     633         1495 :             relstate->lsn = InvalidXLogRecPtr;
     634              :         else
     635          329 :             relstate->lsn = DatumGetLSN(d);
     636              : 
     637         1824 :         res = lappend(res, relstate);
     638              :     }
     639              : 
     640              :     /* Cleanup */
     641         1121 :     systable_endscan(scan);
     642         1121 :     table_close(rel, AccessShareLock);
     643              : 
     644         1121 :     return res;
     645              : }
     646              : 
     647              : /*
     648              :  * Update the dead tuple retention status for the given subscription.
     649              :  */
     650              : void
     651            2 : UpdateDeadTupleRetentionStatus(Oid subid, bool active)
     652              : {
     653              :     Relation    rel;
     654              :     bool        nulls[Natts_pg_subscription];
     655              :     bool        replaces[Natts_pg_subscription];
     656              :     Datum       values[Natts_pg_subscription];
     657              :     HeapTuple   tup;
     658              : 
     659              :     /* Look up the subscription in the catalog */
     660            2 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     661            2 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
     662              : 
     663            2 :     if (!HeapTupleIsValid(tup))
     664            0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
     665              : 
     666            2 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     667              : 
     668              :     /* Form a new tuple. */
     669            2 :     memset(values, 0, sizeof(values));
     670            2 :     memset(nulls, false, sizeof(nulls));
     671            2 :     memset(replaces, false, sizeof(replaces));
     672              : 
     673              :     /* Set the subscription to disabled. */
     674            2 :     values[Anum_pg_subscription_subretentionactive - 1] = active;
     675            2 :     replaces[Anum_pg_subscription_subretentionactive - 1] = true;
     676              : 
     677              :     /* Update the catalog */
     678            2 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     679              :                             replaces);
     680            2 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     681            2 :     heap_freetuple(tup);
     682              : 
     683            2 :     table_close(rel, NoLock);
     684            2 : }
        

Generated by: LCOV version 2.0-1