LCOV - code coverage report
Current view: top level - src/backend/catalog - pg_subscription.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19beta1 Lines: 94.8 % 233 221
Test Date: 2026-06-20 21:16:33 Functions: 100.0 % 12 12
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * pg_subscription.c
       4              :  *      replication subscriptions
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  * IDENTIFICATION
      10              :  *      src/backend/catalog/pg_subscription.c
      11              :  *
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : 
      15              : #include "postgres.h"
      16              : 
      17              : #include "access/genam.h"
      18              : #include "access/heapam.h"
      19              : #include "access/htup_details.h"
      20              : #include "access/tableam.h"
      21              : #include "catalog/indexing.h"
      22              : #include "catalog/pg_foreign_server.h"
      23              : #include "catalog/pg_subscription.h"
      24              : #include "catalog/pg_subscription_rel.h"
      25              : #include "catalog/pg_type.h"
      26              : #include "foreign/foreign.h"
      27              : #include "miscadmin.h"
      28              : #include "storage/lmgr.h"
      29              : #include "storage/lock.h"
      30              : #include "utils/acl.h"
      31              : #include "utils/array.h"
      32              : #include "utils/builtins.h"
      33              : #include "utils/fmgroids.h"
      34              : #include "utils/lsyscache.h"
      35              : #include "utils/memutils.h"
      36              : #include "utils/pg_lsn.h"
      37              : #include "utils/rel.h"
      38              : #include "utils/syscache.h"
      39              : 
      40              : static List *textarray_to_stringlist(ArrayType *textarray);
      41              : 
      42              : /*
      43              :  * Add a comma-separated list of publication names to the 'dest' string.
      44              :  *
      45              :  * If quote_literal is true, the returned list can be used to construct an SQL
      46              :  * command, thus no translation is applied.  Otherwise, the string can be used
      47              :  * to create a user-facing message, so translatable quote marks are added.
      48              :  */
      49              : void
      50          546 : GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
      51              : {
      52              :     ListCell   *lc;
      53          546 :     bool        first = true;
      54              : 
      55              :     Assert(publications != NIL);
      56              : 
      57         1400 :     foreach(lc, publications)
      58              :     {
      59          854 :         char       *pubname = strVal(lfirst(lc));
      60              : 
      61          854 :         if (quote_literal)
      62              :         {
      63          844 :             if (!first)
      64          307 :                 appendStringInfoString(dest, ", ");
      65          844 :             appendStringInfoString(dest, quote_literal_cstr(pubname));
      66              :         }
      67              :         else
      68              :         {
      69           10 :             if (first)
      70            9 :                 appendStringInfo(dest, _("\"%s\""), pubname);
      71              :             else
      72            1 :                 appendStringInfo(dest, _(", \"%s\""), pubname);
      73              :         }
      74              : 
      75          854 :         first = false;
      76              :     }
      77          546 : }
      78              : 
      79              : /*
      80              :  * Fetch the subscription from the syscache.
      81              :  *
      82              :  * If conninfo_needed is true, conninfo will be constructed, possibly
      83              :  * encountering errors in ForeignServerConnectionString(). Callers not
      84              :  * expecting such errors should pass false, in which case conninfo will be
      85              :  * NULL.
      86              :  */
      87              : Subscription *
      88         1050 : GetSubscription(Oid subid, bool missing_ok, bool conninfo_needed,
      89              :                 bool conninfo_aclcheck)
      90              : {
      91              :     HeapTuple   tup;
      92              :     Subscription *sub;
      93              :     Form_pg_subscription subform;
      94              :     Datum       datum;
      95              :     bool        isnull;
      96              :     MemoryContext cxt;
      97              :     MemoryContext oldcxt;
      98              : 
      99              :     Assert(conninfo_needed || !conninfo_aclcheck);
     100              : 
     101         1050 :     tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
     102              : 
     103         1050 :     if (!HeapTupleIsValid(tup))
     104              :     {
     105           71 :         if (missing_ok)
     106           71 :             return NULL;
     107              : 
     108            0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
     109              :     }
     110              : 
     111          979 :     cxt = AllocSetContextCreate(CurrentMemoryContext, "subscription",
     112              :                                 ALLOCSET_SMALL_SIZES);
     113          979 :     oldcxt = MemoryContextSwitchTo(cxt);
     114              : 
     115          979 :     subform = (Form_pg_subscription) GETSTRUCT(tup);
     116              : 
     117          979 :     sub = palloc0_object(Subscription);
     118          979 :     sub->cxt = cxt;
     119          979 :     sub->oid = subid;
     120          979 :     sub->dbid = subform->subdbid;
     121          979 :     sub->skiplsn = subform->subskiplsn;
     122          979 :     sub->name = pstrdup(NameStr(subform->subname));
     123          979 :     sub->owner = subform->subowner;
     124          979 :     sub->enabled = subform->subenabled;
     125          979 :     sub->binary = subform->subbinary;
     126          979 :     sub->stream = subform->substream;
     127          979 :     sub->twophasestate = subform->subtwophasestate;
     128          979 :     sub->disableonerr = subform->subdisableonerr;
     129          979 :     sub->passwordrequired = subform->subpasswordrequired;
     130          979 :     sub->runasowner = subform->subrunasowner;
     131          979 :     sub->failover = subform->subfailover;
     132          979 :     sub->retaindeadtuples = subform->subretaindeadtuples;
     133          979 :     sub->maxretention = subform->submaxretention;
     134          979 :     sub->retentionactive = subform->subretentionactive;
     135              : 
     136          979 :     if (conninfo_needed)
     137              :     {
     138          877 :         if (OidIsValid(subform->subserver))
     139              :         {
     140              :             AclResult   aclresult;
     141              :             ForeignServer *server;
     142              : 
     143            5 :             server = GetForeignServer(subform->subserver);
     144              : 
     145            5 :             if (conninfo_aclcheck)
     146              :             {
     147              :                 /* recheck ACL if requested */
     148            5 :                 aclresult = object_aclcheck(ForeignServerRelationId,
     149              :                                             subform->subserver,
     150              :                                             subform->subowner, ACL_USAGE);
     151              : 
     152            5 :                 if (aclresult != ACLCHECK_OK)
     153            0 :                     ereport(ERROR,
     154              :                             (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     155              :                              errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
     156              :                                     GetUserNameFromId(subform->subowner, false),
     157              :                                     server->servername)));
     158              :             }
     159              : 
     160            5 :             sub->conninfo = ForeignServerConnectionString(subform->subowner,
     161              :                                                           server);
     162              :         }
     163              :         else
     164              :         {
     165          872 :             datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     166              :                                            tup,
     167              :                                            Anum_pg_subscription_subconninfo);
     168          872 :             sub->conninfo = TextDatumGetCString(datum);
     169              :         }
     170              :     }
     171              : 
     172              :     /* Get slotname */
     173          979 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID,
     174              :                             tup,
     175              :                             Anum_pg_subscription_subslotname,
     176              :                             &isnull);
     177          979 :     if (!isnull)
     178          935 :         sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
     179              :     else
     180           44 :         sub->slotname = NULL;
     181              : 
     182              :     /* Get synccommit */
     183          979 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     184              :                                    tup,
     185              :                                    Anum_pg_subscription_subsynccommit);
     186          979 :     sub->synccommit = TextDatumGetCString(datum);
     187              : 
     188              :     /* Get walrcvtimeout */
     189          979 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     190              :                                    tup,
     191              :                                    Anum_pg_subscription_subwalrcvtimeout);
     192          979 :     sub->walrcvtimeout = TextDatumGetCString(datum);
     193              : 
     194              :     /* Get publications */
     195          979 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     196              :                                    tup,
     197              :                                    Anum_pg_subscription_subpublications);
     198          979 :     sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
     199              : 
     200              :     /* Get origin */
     201          979 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     202              :                                    tup,
     203              :                                    Anum_pg_subscription_suborigin);
     204          979 :     sub->origin = TextDatumGetCString(datum);
     205              : 
     206              :     /* Is the subscription owner a superuser? */
     207          979 :     sub->ownersuperuser = superuser_arg(sub->owner);
     208              : 
     209          979 :     ReleaseSysCache(tup);
     210              : 
     211          979 :     MemoryContextSwitchTo(oldcxt);
     212              : 
     213          979 :     return sub;
     214              : }
     215              : 
     216              : /*
     217              :  * Return number of subscriptions defined in given database.
     218              :  * Used by dropdb() to check if database can indeed be dropped.
     219              :  */
     220              : int
     221           51 : CountDBSubscriptions(Oid dbid)
     222              : {
     223           51 :     int         nsubs = 0;
     224              :     Relation    rel;
     225              :     ScanKeyData scankey;
     226              :     SysScanDesc scan;
     227              :     HeapTuple   tup;
     228              : 
     229           51 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     230              : 
     231           51 :     ScanKeyInit(&scankey,
     232              :                 Anum_pg_subscription_subdbid,
     233              :                 BTEqualStrategyNumber, F_OIDEQ,
     234              :                 ObjectIdGetDatum(dbid));
     235              : 
     236           51 :     scan = systable_beginscan(rel, InvalidOid, false,
     237              :                               NULL, 1, &scankey);
     238              : 
     239           51 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     240            0 :         nsubs++;
     241              : 
     242           51 :     systable_endscan(scan);
     243              : 
     244           51 :     table_close(rel, NoLock);
     245              : 
     246           51 :     return nsubs;
     247              : }
     248              : 
     249              : /*
     250              :  * Disable the given subscription.
     251              :  */
     252              : void
     253            4 : DisableSubscription(Oid subid)
     254              : {
     255              :     Relation    rel;
     256              :     bool        nulls[Natts_pg_subscription];
     257              :     bool        replaces[Natts_pg_subscription];
     258              :     Datum       values[Natts_pg_subscription];
     259              :     HeapTuple   tup;
     260              : 
     261              :     /* Look up the subscription in the catalog */
     262            4 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     263            4 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
     264              : 
     265            4 :     if (!HeapTupleIsValid(tup))
     266            0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
     267              : 
     268            4 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     269              : 
     270              :     /* Form a new tuple. */
     271            4 :     memset(values, 0, sizeof(values));
     272            4 :     memset(nulls, false, sizeof(nulls));
     273            4 :     memset(replaces, false, sizeof(replaces));
     274              : 
     275              :     /* Set the subscription to disabled. */
     276            4 :     values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
     277            4 :     replaces[Anum_pg_subscription_subenabled - 1] = true;
     278              : 
     279              :     /* Update the catalog */
     280            4 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     281              :                             replaces);
     282            4 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     283            4 :     heap_freetuple(tup);
     284              : 
     285            4 :     table_close(rel, NoLock);
     286            4 : }
     287              : 
     288              : /*
     289              :  * Convert text array to list of strings.
     290              :  *
     291              :  * Note: the resulting list of strings is pallocated here.
     292              :  */
     293              : static List *
     294          979 : textarray_to_stringlist(ArrayType *textarray)
     295              : {
     296              :     Datum      *elems;
     297              :     int         nelems,
     298              :                 i;
     299          979 :     List       *res = NIL;
     300              : 
     301          979 :     deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
     302              : 
     303          979 :     if (nelems == 0)
     304            0 :         return NIL;
     305              : 
     306         2360 :     for (i = 0; i < nelems; i++)
     307         1381 :         res = lappend(res, makeString(TextDatumGetCString(elems[i])));
     308              : 
     309          979 :     return res;
     310              : }
     311              : 
     312              : /*
     313              :  * Add new state record for a subscription table.
     314              :  *
     315              :  * If retain_lock is true, then don't release the locks taken in this function.
     316              :  * We normally release the locks at the end of transaction but in binary-upgrade
     317              :  * mode, we expect to release those immediately.
     318              :  */
     319              : void
     320          228 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
     321              :                         XLogRecPtr sublsn, bool retain_lock)
     322              : {
     323              :     Relation    rel;
     324              :     HeapTuple   tup;
     325              :     bool        nulls[Natts_pg_subscription_rel];
     326              :     Datum       values[Natts_pg_subscription_rel];
     327              : 
     328          228 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     329              : 
     330          228 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     331              : 
     332              :     /* Try finding existing mapping. */
     333          228 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     334              :                               ObjectIdGetDatum(relid),
     335              :                               ObjectIdGetDatum(subid));
     336          228 :     if (HeapTupleIsValid(tup))
     337            0 :         elog(ERROR, "subscription relation %u in subscription %u already exists",
     338              :              relid, subid);
     339              : 
     340              :     /* Form the tuple. */
     341          228 :     memset(values, 0, sizeof(values));
     342          228 :     memset(nulls, false, sizeof(nulls));
     343          228 :     values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
     344          228 :     values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
     345          228 :     values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     346          228 :     if (XLogRecPtrIsValid(sublsn))
     347            2 :         values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     348              :     else
     349          226 :         nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     350              : 
     351          228 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     352              : 
     353              :     /* Insert tuple into catalog. */
     354          228 :     CatalogTupleInsert(rel, tup);
     355              : 
     356          228 :     heap_freetuple(tup);
     357              : 
     358              :     /* Cleanup. */
     359          228 :     if (retain_lock)
     360              :     {
     361          225 :         table_close(rel, NoLock);
     362              :     }
     363              :     else
     364              :     {
     365            3 :         table_close(rel, RowExclusiveLock);
     366            3 :         UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     367              :     }
     368          228 : }
     369              : 
     370              : /*
     371              :  * Update the state of a subscription table.
     372              :  */
     373              : void
     374          842 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
     375              :                            XLogRecPtr sublsn, bool already_locked)
     376              : {
     377              :     Relation    rel;
     378              :     HeapTuple   tup;
     379              :     bool        nulls[Natts_pg_subscription_rel];
     380              :     Datum       values[Natts_pg_subscription_rel];
     381              :     bool        replaces[Natts_pg_subscription_rel];
     382              : 
     383          842 :     if (already_locked)
     384              :     {
     385              : #ifdef USE_ASSERT_CHECKING
     386              :         LOCKTAG     tag;
     387              : 
     388              :         Assert(CheckRelationOidLockedByMe(SubscriptionRelRelationId,
     389              :                                           RowExclusiveLock, true));
     390              :         SET_LOCKTAG_OBJECT(tag, InvalidOid, SubscriptionRelationId, subid, 0);
     391              :         Assert(LockHeldByMe(&tag, AccessShareLock, true));
     392              : #endif
     393              : 
     394          196 :         rel = table_open(SubscriptionRelRelationId, NoLock);
     395              :     }
     396              :     else
     397              :     {
     398          646 :         LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     399          646 :         rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     400              :     }
     401              : 
     402              :     /* Try finding existing mapping. */
     403          842 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     404              :                               ObjectIdGetDatum(relid),
     405              :                               ObjectIdGetDatum(subid));
     406          842 :     if (!HeapTupleIsValid(tup))
     407            0 :         elog(ERROR, "subscription relation %u in subscription %u does not exist",
     408              :              relid, subid);
     409              : 
     410              :     /* Update the tuple. */
     411          842 :     memset(values, 0, sizeof(values));
     412          842 :     memset(nulls, false, sizeof(nulls));
     413          842 :     memset(replaces, false, sizeof(replaces));
     414              : 
     415          842 :     replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
     416          842 :     values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     417              : 
     418          842 :     replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
     419          842 :     if (XLogRecPtrIsValid(sublsn))
     420          414 :         values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     421              :     else
     422          428 :         nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     423              : 
     424          842 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     425              :                             replaces);
     426              : 
     427              :     /* Update the catalog. */
     428          842 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     429              : 
     430              :     /* Cleanup. */
     431          842 :     table_close(rel, NoLock);
     432          842 : }
     433              : 
     434              : /*
     435              :  * Get state of subscription table.
     436              :  *
     437              :  * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
     438              :  */
     439              : char
     440         1309 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
     441              : {
     442              :     HeapTuple   tup;
     443              :     char        substate;
     444              :     bool        isnull;
     445              :     Datum       d;
     446              :     Relation    rel;
     447              : 
     448              :     /*
     449              :      * This is to avoid the race condition with AlterSubscription which tries
     450              :      * to remove this relstate.
     451              :      */
     452         1309 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     453              : 
     454              :     /* Try finding the mapping. */
     455         1309 :     tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
     456              :                           ObjectIdGetDatum(relid),
     457              :                           ObjectIdGetDatum(subid));
     458              : 
     459         1309 :     if (!HeapTupleIsValid(tup))
     460              :     {
     461           33 :         table_close(rel, AccessShareLock);
     462           33 :         *sublsn = InvalidXLogRecPtr;
     463           33 :         return SUBREL_STATE_UNKNOWN;
     464              :     }
     465              : 
     466              :     /* Get the state. */
     467         1276 :     substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
     468              : 
     469              :     /* Get the LSN */
     470         1276 :     d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     471              :                         Anum_pg_subscription_rel_srsublsn, &isnull);
     472         1276 :     if (isnull)
     473          684 :         *sublsn = InvalidXLogRecPtr;
     474              :     else
     475          592 :         *sublsn = DatumGetLSN(d);
     476              : 
     477              :     /* Cleanup */
     478         1276 :     ReleaseSysCache(tup);
     479              : 
     480         1276 :     table_close(rel, AccessShareLock);
     481              : 
     482         1276 :     return substate;
     483              : }
     484              : 
     485              : /*
     486              :  * Drop subscription relation mapping. These can be for a particular
     487              :  * subscription, or for a particular relation, or both.
     488              :  */
     489              : void
     490        33948 : RemoveSubscriptionRel(Oid subid, Oid relid)
     491              : {
     492              :     Relation    rel;
     493              :     TableScanDesc scan;
     494              :     ScanKeyData skey[2];
     495              :     HeapTuple   tup;
     496        33948 :     int         nkeys = 0;
     497              : 
     498        33948 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     499              : 
     500        33948 :     if (OidIsValid(subid))
     501              :     {
     502          172 :         ScanKeyInit(&skey[nkeys++],
     503              :                     Anum_pg_subscription_rel_srsubid,
     504              :                     BTEqualStrategyNumber,
     505              :                     F_OIDEQ,
     506              :                     ObjectIdGetDatum(subid));
     507              :     }
     508              : 
     509        33948 :     if (OidIsValid(relid))
     510              :     {
     511        33797 :         ScanKeyInit(&skey[nkeys++],
     512              :                     Anum_pg_subscription_rel_srrelid,
     513              :                     BTEqualStrategyNumber,
     514              :                     F_OIDEQ,
     515              :                     ObjectIdGetDatum(relid));
     516              :     }
     517              : 
     518              :     /* Do the search and delete what we found. */
     519        33948 :     scan = table_beginscan_catalog(rel, nkeys, skey);
     520        34078 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     521              :     {
     522              :         Form_pg_subscription_rel subrel;
     523              : 
     524          130 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     525              : 
     526              :         /*
     527              :          * We don't allow to drop the relation mapping when the table
     528              :          * synchronization is in progress unless the caller updates the
     529              :          * corresponding subscription as well. This is to ensure that we don't
     530              :          * leave tablesync slots or origins in the system when the
     531              :          * corresponding table is dropped. For sequences, however, it's ok to
     532              :          * drop them since no separate slots or origins are created during
     533              :          * synchronization.
     534              :          */
     535          130 :         if (!OidIsValid(subid) &&
     536           16 :             subrel->srsubstate != SUBREL_STATE_READY &&
     537            0 :             get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE)
     538              :         {
     539            0 :             ereport(ERROR,
     540              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     541              :                      errmsg("could not drop relation mapping for subscription \"%s\"",
     542              :                             get_subscription_name(subrel->srsubid, false)),
     543              :                      errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
     544              :                                get_rel_name(relid), subrel->srsubstate),
     545              : 
     546              :             /*
     547              :              * translator: first %s is a SQL ALTER command and second %s is a
     548              :              * SQL DROP command
     549              :              */
     550              :                      errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
     551              :                              "ALTER SUBSCRIPTION ... ENABLE",
     552              :                              "DROP SUBSCRIPTION ...")));
     553              :         }
     554              : 
     555          130 :         CatalogTupleDelete(rel, &tup->t_self);
     556              :     }
     557        33948 :     table_endscan(scan);
     558              : 
     559        33948 :     table_close(rel, RowExclusiveLock);
     560        33948 : }
     561              : 
     562              : /*
     563              :  * Does the subscription have any tables?
     564              :  *
     565              :  * Use this function only to know true/false, and when you have no need for the
     566              :  * List returned by GetSubscriptionRelations.
     567              :  */
     568              : bool
     569          309 : HasSubscriptionTables(Oid subid)
     570              : {
     571              :     Relation    rel;
     572              :     ScanKeyData skey[1];
     573              :     SysScanDesc scan;
     574              :     HeapTuple   tup;
     575          309 :     bool        has_subtables = false;
     576              : 
     577          309 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     578              : 
     579          309 :     ScanKeyInit(&skey[0],
     580              :                 Anum_pg_subscription_rel_srsubid,
     581              :                 BTEqualStrategyNumber, F_OIDEQ,
     582              :                 ObjectIdGetDatum(subid));
     583              : 
     584          309 :     scan = systable_beginscan(rel, InvalidOid, false,
     585              :                               NULL, 1, skey);
     586              : 
     587          368 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     588              :     {
     589              :         Form_pg_subscription_rel subrel;
     590              :         char        relkind;
     591              : 
     592          349 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     593          349 :         relkind = get_rel_relkind(subrel->srrelid);
     594              : 
     595          349 :         if (relkind == RELKIND_RELATION ||
     596              :             relkind == RELKIND_PARTITIONED_TABLE)
     597              :         {
     598          290 :             has_subtables = true;
     599          290 :             break;
     600              :         }
     601              :     }
     602              : 
     603              :     /* Cleanup */
     604          309 :     systable_endscan(scan);
     605          309 :     table_close(rel, AccessShareLock);
     606              : 
     607          309 :     return has_subtables;
     608              : }
     609              : 
     610              : /*
     611              :  * Get the relations for the subscription.
     612              :  *
     613              :  * If not_ready is true, return only the relations that are not in a ready
     614              :  * state, otherwise return all the relations of the subscription.  The
     615              :  * returned list is palloc'ed in the current memory context.
     616              :  */
     617              : List *
     618         1251 : GetSubscriptionRelations(Oid subid, bool tables, bool sequences,
     619              :                          bool not_ready)
     620              : {
     621         1251 :     List       *res = NIL;
     622              :     Relation    rel;
     623              :     HeapTuple   tup;
     624         1251 :     int         nkeys = 0;
     625              :     ScanKeyData skey[2];
     626              :     SysScanDesc scan;
     627              : 
     628              :     /* One or both of 'tables' and 'sequences' must be true. */
     629              :     Assert(tables || sequences);
     630              : 
     631         1251 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     632              : 
     633         1251 :     ScanKeyInit(&skey[nkeys++],
     634              :                 Anum_pg_subscription_rel_srsubid,
     635              :                 BTEqualStrategyNumber, F_OIDEQ,
     636              :                 ObjectIdGetDatum(subid));
     637              : 
     638         1251 :     if (not_ready)
     639         1210 :         ScanKeyInit(&skey[nkeys++],
     640              :                     Anum_pg_subscription_rel_srsubstate,
     641              :                     BTEqualStrategyNumber, F_CHARNE,
     642              :                     CharGetDatum(SUBREL_STATE_READY));
     643              : 
     644         1251 :     scan = systable_beginscan(rel, InvalidOid, false,
     645              :                               NULL, nkeys, skey);
     646              : 
     647         3239 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     648              :     {
     649              :         Form_pg_subscription_rel subrel;
     650              :         SubscriptionRelState *relstate;
     651              :         Datum       d;
     652              :         bool        isnull;
     653              :         char        relkind;
     654              : 
     655         1988 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     656              : 
     657              :         /* Relation is either a sequence or a table */
     658         1988 :         relkind = get_rel_relkind(subrel->srrelid);
     659              :         Assert(relkind == RELKIND_SEQUENCE || relkind == RELKIND_RELATION ||
     660              :                relkind == RELKIND_PARTITIONED_TABLE);
     661              : 
     662              :         /* Skip sequences if they were not requested */
     663         1988 :         if ((relkind == RELKIND_SEQUENCE) && !sequences)
     664            0 :             continue;
     665              : 
     666              :         /* Skip tables if they were not requested */
     667         1988 :         if ((relkind == RELKIND_RELATION ||
     668         1938 :              relkind == RELKIND_PARTITIONED_TABLE) && !tables)
     669            0 :             continue;
     670              : 
     671         1988 :         relstate = palloc_object(SubscriptionRelState);
     672         1988 :         relstate->relid = subrel->srrelid;
     673         1988 :         relstate->state = subrel->srsubstate;
     674         1988 :         d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     675              :                             Anum_pg_subscription_rel_srsublsn, &isnull);
     676         1988 :         if (isnull)
     677         1621 :             relstate->lsn = InvalidXLogRecPtr;
     678              :         else
     679          367 :             relstate->lsn = DatumGetLSN(d);
     680              : 
     681         1988 :         res = lappend(res, relstate);
     682              :     }
     683              : 
     684              :     /* Cleanup */
     685         1251 :     systable_endscan(scan);
     686         1251 :     table_close(rel, AccessShareLock);
     687              : 
     688         1251 :     return res;
     689              : }
     690              : 
     691              : /*
     692              :  * Update the dead tuple retention status for the given subscription.
     693              :  */
     694              : void
     695            2 : UpdateDeadTupleRetentionStatus(Oid subid, bool active)
     696              : {
     697              :     Relation    rel;
     698              :     bool        nulls[Natts_pg_subscription];
     699              :     bool        replaces[Natts_pg_subscription];
     700              :     Datum       values[Natts_pg_subscription];
     701              :     HeapTuple   tup;
     702              : 
     703              :     /* Look up the subscription in the catalog */
     704            2 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     705            2 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
     706              : 
     707            2 :     if (!HeapTupleIsValid(tup))
     708            0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
     709              : 
     710            2 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     711              : 
     712              :     /* Form a new tuple. */
     713            2 :     memset(values, 0, sizeof(values));
     714            2 :     memset(nulls, false, sizeof(nulls));
     715            2 :     memset(replaces, false, sizeof(replaces));
     716              : 
     717              :     /* Set the subscription to disabled. */
     718            2 :     values[Anum_pg_subscription_subretentionactive - 1] = BoolGetDatum(active);
     719            2 :     replaces[Anum_pg_subscription_subretentionactive - 1] = true;
     720              : 
     721              :     /* Update the catalog */
     722            2 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     723              :                             replaces);
     724            2 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     725            2 :     heap_freetuple(tup);
     726              : 
     727            2 :     table_close(rel, NoLock);
     728            2 : }
        

Generated by: LCOV version 2.0-1