LCOV - code coverage report
Current view: top level - src/backend/catalog - pg_subscription.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19beta1 Lines: 94.8 % 232 220
Test Date: 2026-06-13 21:16:49 Functions: 100.0 % 12 12
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * pg_subscription.c
       4              :  *      replication subscriptions
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  * IDENTIFICATION
      10              :  *      src/backend/catalog/pg_subscription.c
      11              :  *
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : 
      15              : #include "postgres.h"
      16              : 
      17              : #include "access/genam.h"
      18              : #include "access/heapam.h"
      19              : #include "access/htup_details.h"
      20              : #include "access/tableam.h"
      21              : #include "catalog/indexing.h"
      22              : #include "catalog/pg_foreign_server.h"
      23              : #include "catalog/pg_subscription.h"
      24              : #include "catalog/pg_subscription_rel.h"
      25              : #include "catalog/pg_type.h"
      26              : #include "foreign/foreign.h"
      27              : #include "miscadmin.h"
      28              : #include "storage/lmgr.h"
      29              : #include "storage/lock.h"
      30              : #include "utils/acl.h"
      31              : #include "utils/array.h"
      32              : #include "utils/builtins.h"
      33              : #include "utils/fmgroids.h"
      34              : #include "utils/lsyscache.h"
      35              : #include "utils/memutils.h"
      36              : #include "utils/pg_lsn.h"
      37              : #include "utils/rel.h"
      38              : #include "utils/syscache.h"
      39              : 
      40              : static List *textarray_to_stringlist(ArrayType *textarray);
      41              : 
      42              : /*
      43              :  * Add a comma-separated list of publication names to the 'dest' string.
      44              :  *
      45              :  * If quote_literal is true, the returned list can be used to construct an SQL
      46              :  * command, thus no translation is applied.  Otherwise, the string can be used
      47              :  * to create a user-facing message, so translatable quote marks are added.
      48              :  */
      49              : void
      50          547 : GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
      51              : {
      52              :     ListCell   *lc;
      53          547 :     bool        first = true;
      54              : 
      55              :     Assert(publications != NIL);
      56              : 
      57         1402 :     foreach(lc, publications)
      58              :     {
      59          855 :         char       *pubname = strVal(lfirst(lc));
      60              : 
      61          855 :         if (quote_literal)
      62              :         {
      63          845 :             if (!first)
      64          307 :                 appendStringInfoString(dest, ", ");
      65          845 :             appendStringInfoString(dest, quote_literal_cstr(pubname));
      66              :         }
      67              :         else
      68              :         {
      69           10 :             if (first)
      70            9 :                 appendStringInfo(dest, _("\"%s\""), pubname);
      71              :             else
      72            1 :                 appendStringInfo(dest, _(", \"%s\""), pubname);
      73              :         }
      74              : 
      75          855 :         first = false;
      76              :     }
      77          547 : }
      78              : 
      79              : /*
      80              :  * Fetch the subscription from the syscache.
      81              :  */
      82              : Subscription *
      83         1054 : GetSubscription(Oid subid, bool missing_ok, bool aclcheck)
      84              : {
      85              :     HeapTuple   tup;
      86              :     Subscription *sub;
      87              :     Form_pg_subscription subform;
      88              :     Datum       datum;
      89              :     bool        isnull;
      90              :     MemoryContext cxt;
      91              :     MemoryContext oldcxt;
      92              : 
      93         1054 :     tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
      94              : 
      95         1054 :     if (!HeapTupleIsValid(tup))
      96              :     {
      97           70 :         if (missing_ok)
      98           70 :             return NULL;
      99              : 
     100            0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
     101              :     }
     102              : 
     103          984 :     cxt = AllocSetContextCreate(CurrentMemoryContext, "subscription",
     104              :                                 ALLOCSET_SMALL_SIZES);
     105          984 :     oldcxt = MemoryContextSwitchTo(cxt);
     106              : 
     107          984 :     subform = (Form_pg_subscription) GETSTRUCT(tup);
     108              : 
     109          984 :     sub = palloc_object(Subscription);
     110          984 :     sub->cxt = cxt;
     111          984 :     sub->oid = subid;
     112          984 :     sub->dbid = subform->subdbid;
     113          984 :     sub->skiplsn = subform->subskiplsn;
     114          984 :     sub->name = pstrdup(NameStr(subform->subname));
     115          984 :     sub->owner = subform->subowner;
     116          984 :     sub->enabled = subform->subenabled;
     117          984 :     sub->binary = subform->subbinary;
     118          984 :     sub->stream = subform->substream;
     119          984 :     sub->twophasestate = subform->subtwophasestate;
     120          984 :     sub->disableonerr = subform->subdisableonerr;
     121          984 :     sub->passwordrequired = subform->subpasswordrequired;
     122          984 :     sub->runasowner = subform->subrunasowner;
     123          984 :     sub->failover = subform->subfailover;
     124          984 :     sub->retaindeadtuples = subform->subretaindeadtuples;
     125          984 :     sub->maxretention = subform->submaxretention;
     126          984 :     sub->retentionactive = subform->subretentionactive;
     127              : 
     128              :     /* Get conninfo */
     129          984 :     if (OidIsValid(subform->subserver))
     130              :     {
     131              :         AclResult   aclresult;
     132              :         ForeignServer *server;
     133              : 
     134           10 :         server = GetForeignServer(subform->subserver);
     135              : 
     136              :         /* recheck ACL if requested */
     137           10 :         if (aclcheck)
     138              :         {
     139            5 :             aclresult = object_aclcheck(ForeignServerRelationId,
     140              :                                         subform->subserver,
     141              :                                         subform->subowner, ACL_USAGE);
     142              : 
     143            5 :             if (aclresult != ACLCHECK_OK)
     144            0 :                 ereport(ERROR,
     145              :                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     146              :                          errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
     147              :                                 GetUserNameFromId(subform->subowner, false),
     148              :                                 server->servername)));
     149              :         }
     150              : 
     151           10 :         sub->conninfo = ForeignServerConnectionString(subform->subowner,
     152              :                                                       server);
     153              :     }
     154              :     else
     155              :     {
     156          974 :         datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     157              :                                        tup,
     158              :                                        Anum_pg_subscription_subconninfo);
     159          974 :         sub->conninfo = TextDatumGetCString(datum);
     160              :     }
     161              : 
     162              :     /* Get slotname */
     163          984 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID,
     164              :                             tup,
     165              :                             Anum_pg_subscription_subslotname,
     166              :                             &isnull);
     167          984 :     if (!isnull)
     168          940 :         sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
     169              :     else
     170           44 :         sub->slotname = NULL;
     171              : 
     172              :     /* Get synccommit */
     173          984 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     174              :                                    tup,
     175              :                                    Anum_pg_subscription_subsynccommit);
     176          984 :     sub->synccommit = TextDatumGetCString(datum);
     177              : 
     178              :     /* Get walrcvtimeout */
     179          984 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     180              :                                    tup,
     181              :                                    Anum_pg_subscription_subwalrcvtimeout);
     182          984 :     sub->walrcvtimeout = TextDatumGetCString(datum);
     183              : 
     184              :     /* Get publications */
     185          984 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     186              :                                    tup,
     187              :                                    Anum_pg_subscription_subpublications);
     188          984 :     sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
     189              : 
     190              :     /* Get origin */
     191          984 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     192              :                                    tup,
     193              :                                    Anum_pg_subscription_suborigin);
     194          984 :     sub->origin = TextDatumGetCString(datum);
     195              : 
     196              :     /* Is the subscription owner a superuser? */
     197          984 :     sub->ownersuperuser = superuser_arg(sub->owner);
     198              : 
     199          984 :     ReleaseSysCache(tup);
     200              : 
     201          984 :     MemoryContextSwitchTo(oldcxt);
     202              : 
     203          984 :     return sub;
     204              : }
     205              : 
     206              : /*
     207              :  * Return number of subscriptions defined in given database.
     208              :  * Used by dropdb() to check if database can indeed be dropped.
     209              :  */
     210              : int
     211           50 : CountDBSubscriptions(Oid dbid)
     212              : {
     213           50 :     int         nsubs = 0;
     214              :     Relation    rel;
     215              :     ScanKeyData scankey;
     216              :     SysScanDesc scan;
     217              :     HeapTuple   tup;
     218              : 
     219           50 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     220              : 
     221           50 :     ScanKeyInit(&scankey,
     222              :                 Anum_pg_subscription_subdbid,
     223              :                 BTEqualStrategyNumber, F_OIDEQ,
     224              :                 ObjectIdGetDatum(dbid));
     225              : 
     226           50 :     scan = systable_beginscan(rel, InvalidOid, false,
     227              :                               NULL, 1, &scankey);
     228              : 
     229           50 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     230            0 :         nsubs++;
     231              : 
     232           50 :     systable_endscan(scan);
     233              : 
     234           50 :     table_close(rel, NoLock);
     235              : 
     236           50 :     return nsubs;
     237              : }
     238              : 
     239              : /*
     240              :  * Disable the given subscription.
     241              :  */
     242              : void
     243            4 : DisableSubscription(Oid subid)
     244              : {
     245              :     Relation    rel;
     246              :     bool        nulls[Natts_pg_subscription];
     247              :     bool        replaces[Natts_pg_subscription];
     248              :     Datum       values[Natts_pg_subscription];
     249              :     HeapTuple   tup;
     250              : 
     251              :     /* Look up the subscription in the catalog */
     252            4 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     253            4 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
     254              : 
     255            4 :     if (!HeapTupleIsValid(tup))
     256            0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
     257              : 
     258            4 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     259              : 
     260              :     /* Form a new tuple. */
     261            4 :     memset(values, 0, sizeof(values));
     262            4 :     memset(nulls, false, sizeof(nulls));
     263            4 :     memset(replaces, false, sizeof(replaces));
     264              : 
     265              :     /* Set the subscription to disabled. */
     266            4 :     values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
     267            4 :     replaces[Anum_pg_subscription_subenabled - 1] = true;
     268              : 
     269              :     /* Update the catalog */
     270            4 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     271              :                             replaces);
     272            4 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     273            4 :     heap_freetuple(tup);
     274              : 
     275            4 :     table_close(rel, NoLock);
     276            4 : }
     277              : 
     278              : /*
     279              :  * Convert text array to list of strings.
     280              :  *
     281              :  * Note: the resulting list of strings is pallocated here.
     282              :  */
     283              : static List *
     284          984 : textarray_to_stringlist(ArrayType *textarray)
     285              : {
     286              :     Datum      *elems;
     287              :     int         nelems,
     288              :                 i;
     289          984 :     List       *res = NIL;
     290              : 
     291          984 :     deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
     292              : 
     293          984 :     if (nelems == 0)
     294            0 :         return NIL;
     295              : 
     296         2389 :     for (i = 0; i < nelems; i++)
     297         1405 :         res = lappend(res, makeString(TextDatumGetCString(elems[i])));
     298              : 
     299          984 :     return res;
     300              : }
     301              : 
     302              : /*
     303              :  * Add new state record for a subscription table.
     304              :  *
     305              :  * If retain_lock is true, then don't release the locks taken in this function.
     306              :  * We normally release the locks at the end of transaction but in binary-upgrade
     307              :  * mode, we expect to release those immediately.
     308              :  */
     309              : void
     310          228 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
     311              :                         XLogRecPtr sublsn, bool retain_lock)
     312              : {
     313              :     Relation    rel;
     314              :     HeapTuple   tup;
     315              :     bool        nulls[Natts_pg_subscription_rel];
     316              :     Datum       values[Natts_pg_subscription_rel];
     317              : 
     318          228 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     319              : 
     320          228 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     321              : 
     322              :     /* Try finding existing mapping. */
     323          228 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     324              :                               ObjectIdGetDatum(relid),
     325              :                               ObjectIdGetDatum(subid));
     326          228 :     if (HeapTupleIsValid(tup))
     327            0 :         elog(ERROR, "subscription relation %u in subscription %u already exists",
     328              :              relid, subid);
     329              : 
     330              :     /* Form the tuple. */
     331          228 :     memset(values, 0, sizeof(values));
     332          228 :     memset(nulls, false, sizeof(nulls));
     333          228 :     values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
     334          228 :     values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
     335          228 :     values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     336          228 :     if (XLogRecPtrIsValid(sublsn))
     337            2 :         values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     338              :     else
     339          226 :         nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     340              : 
     341          228 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     342              : 
     343              :     /* Insert tuple into catalog. */
     344          228 :     CatalogTupleInsert(rel, tup);
     345              : 
     346          228 :     heap_freetuple(tup);
     347              : 
     348              :     /* Cleanup. */
     349          228 :     if (retain_lock)
     350              :     {
     351          225 :         table_close(rel, NoLock);
     352              :     }
     353              :     else
     354              :     {
     355            3 :         table_close(rel, RowExclusiveLock);
     356            3 :         UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     357              :     }
     358          228 : }
     359              : 
     360              : /*
     361              :  * Update the state of a subscription table.
     362              :  */
     363              : void
     364          830 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
     365              :                            XLogRecPtr sublsn, bool already_locked)
     366              : {
     367              :     Relation    rel;
     368              :     HeapTuple   tup;
     369              :     bool        nulls[Natts_pg_subscription_rel];
     370              :     Datum       values[Natts_pg_subscription_rel];
     371              :     bool        replaces[Natts_pg_subscription_rel];
     372              : 
     373          830 :     if (already_locked)
     374              :     {
     375              : #ifdef USE_ASSERT_CHECKING
     376              :         LOCKTAG     tag;
     377              : 
     378              :         Assert(CheckRelationOidLockedByMe(SubscriptionRelRelationId,
     379              :                                           RowExclusiveLock, true));
     380              :         SET_LOCKTAG_OBJECT(tag, InvalidOid, SubscriptionRelationId, subid, 0);
     381              :         Assert(LockHeldByMe(&tag, AccessShareLock, true));
     382              : #endif
     383              : 
     384          196 :         rel = table_open(SubscriptionRelRelationId, NoLock);
     385              :     }
     386              :     else
     387              :     {
     388          634 :         LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     389          633 :         rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     390              :     }
     391              : 
     392              :     /* Try finding existing mapping. */
     393          829 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     394              :                               ObjectIdGetDatum(relid),
     395              :                               ObjectIdGetDatum(subid));
     396          829 :     if (!HeapTupleIsValid(tup))
     397            0 :         elog(ERROR, "subscription relation %u in subscription %u does not exist",
     398              :              relid, subid);
     399              : 
     400              :     /* Update the tuple. */
     401          829 :     memset(values, 0, sizeof(values));
     402          829 :     memset(nulls, false, sizeof(nulls));
     403          829 :     memset(replaces, false, sizeof(replaces));
     404              : 
     405          829 :     replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
     406          829 :     values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     407              : 
     408          829 :     replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
     409          829 :     if (XLogRecPtrIsValid(sublsn))
     410          408 :         values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     411              :     else
     412          421 :         nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     413              : 
     414          829 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     415              :                             replaces);
     416              : 
     417              :     /* Update the catalog. */
     418          829 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     419              : 
     420              :     /* Cleanup. */
     421          829 :     table_close(rel, NoLock);
     422          829 : }
     423              : 
     424              : /*
     425              :  * Get state of subscription table.
     426              :  *
     427              :  * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
     428              :  */
     429              : char
     430         4291 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
     431              : {
     432              :     HeapTuple   tup;
     433              :     char        substate;
     434              :     bool        isnull;
     435              :     Datum       d;
     436              :     Relation    rel;
     437              : 
     438              :     /*
     439              :      * This is to avoid the race condition with AlterSubscription which tries
     440              :      * to remove this relstate.
     441              :      */
     442         4291 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     443              : 
     444              :     /* Try finding the mapping. */
     445         4291 :     tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
     446              :                           ObjectIdGetDatum(relid),
     447              :                           ObjectIdGetDatum(subid));
     448              : 
     449         4291 :     if (!HeapTupleIsValid(tup))
     450              :     {
     451           33 :         table_close(rel, AccessShareLock);
     452           33 :         *sublsn = InvalidXLogRecPtr;
     453           33 :         return SUBREL_STATE_UNKNOWN;
     454              :     }
     455              : 
     456              :     /* Get the state. */
     457         4258 :     substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
     458              : 
     459              :     /* Get the LSN */
     460         4258 :     d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     461              :                         Anum_pg_subscription_rel_srsublsn, &isnull);
     462         4258 :     if (isnull)
     463         3673 :         *sublsn = InvalidXLogRecPtr;
     464              :     else
     465          585 :         *sublsn = DatumGetLSN(d);
     466              : 
     467              :     /* Cleanup */
     468         4258 :     ReleaseSysCache(tup);
     469              : 
     470         4258 :     table_close(rel, AccessShareLock);
     471              : 
     472         4258 :     return substate;
     473              : }
     474              : 
     475              : /*
     476              :  * Drop subscription relation mapping. These can be for a particular
     477              :  * subscription, or for a particular relation, or both.
     478              :  */
     479              : void
     480        33821 : RemoveSubscriptionRel(Oid subid, Oid relid)
     481              : {
     482              :     Relation    rel;
     483              :     TableScanDesc scan;
     484              :     ScanKeyData skey[2];
     485              :     HeapTuple   tup;
     486        33821 :     int         nkeys = 0;
     487              : 
     488        33821 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     489              : 
     490        33821 :     if (OidIsValid(subid))
     491              :     {
     492          164 :         ScanKeyInit(&skey[nkeys++],
     493              :                     Anum_pg_subscription_rel_srsubid,
     494              :                     BTEqualStrategyNumber,
     495              :                     F_OIDEQ,
     496              :                     ObjectIdGetDatum(subid));
     497              :     }
     498              : 
     499        33821 :     if (OidIsValid(relid))
     500              :     {
     501        33678 :         ScanKeyInit(&skey[nkeys++],
     502              :                     Anum_pg_subscription_rel_srrelid,
     503              :                     BTEqualStrategyNumber,
     504              :                     F_OIDEQ,
     505              :                     ObjectIdGetDatum(relid));
     506              :     }
     507              : 
     508              :     /* Do the search and delete what we found. */
     509        33821 :     scan = table_beginscan_catalog(rel, nkeys, skey);
     510        33951 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     511              :     {
     512              :         Form_pg_subscription_rel subrel;
     513              : 
     514          130 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     515              : 
     516              :         /*
     517              :          * We don't allow to drop the relation mapping when the table
     518              :          * synchronization is in progress unless the caller updates the
     519              :          * corresponding subscription as well. This is to ensure that we don't
     520              :          * leave tablesync slots or origins in the system when the
     521              :          * corresponding table is dropped. For sequences, however, it's ok to
     522              :          * drop them since no separate slots or origins are created during
     523              :          * synchronization.
     524              :          */
     525          130 :         if (!OidIsValid(subid) &&
     526           16 :             subrel->srsubstate != SUBREL_STATE_READY &&
     527            0 :             get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE)
     528              :         {
     529            0 :             ereport(ERROR,
     530              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     531              :                      errmsg("could not drop relation mapping for subscription \"%s\"",
     532              :                             get_subscription_name(subrel->srsubid, false)),
     533              :                      errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
     534              :                                get_rel_name(relid), subrel->srsubstate),
     535              : 
     536              :             /*
     537              :              * translator: first %s is a SQL ALTER command and second %s is a
     538              :              * SQL DROP command
     539              :              */
     540              :                      errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
     541              :                              "ALTER SUBSCRIPTION ... ENABLE",
     542              :                              "DROP SUBSCRIPTION ...")));
     543              :         }
     544              : 
     545          130 :         CatalogTupleDelete(rel, &tup->t_self);
     546              :     }
     547        33821 :     table_endscan(scan);
     548              : 
     549        33821 :     table_close(rel, RowExclusiveLock);
     550        33821 : }
     551              : 
     552              : /*
     553              :  * Does the subscription have any tables?
     554              :  *
     555              :  * Use this function only to know true/false, and when you have no need for the
     556              :  * List returned by GetSubscriptionRelations.
     557              :  */
     558              : bool
     559          301 : HasSubscriptionTables(Oid subid)
     560              : {
     561              :     Relation    rel;
     562              :     ScanKeyData skey[1];
     563              :     SysScanDesc scan;
     564              :     HeapTuple   tup;
     565          301 :     bool        has_subtables = false;
     566              : 
     567          301 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     568              : 
     569          301 :     ScanKeyInit(&skey[0],
     570              :                 Anum_pg_subscription_rel_srsubid,
     571              :                 BTEqualStrategyNumber, F_OIDEQ,
     572              :                 ObjectIdGetDatum(subid));
     573              : 
     574          301 :     scan = systable_beginscan(rel, InvalidOid, false,
     575              :                               NULL, 1, skey);
     576              : 
     577          351 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     578              :     {
     579              :         Form_pg_subscription_rel subrel;
     580              :         char        relkind;
     581              : 
     582          333 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     583          333 :         relkind = get_rel_relkind(subrel->srrelid);
     584              : 
     585          333 :         if (relkind == RELKIND_RELATION ||
     586              :             relkind == RELKIND_PARTITIONED_TABLE)
     587              :         {
     588          283 :             has_subtables = true;
     589          283 :             break;
     590              :         }
     591              :     }
     592              : 
     593              :     /* Cleanup */
     594          301 :     systable_endscan(scan);
     595          301 :     table_close(rel, AccessShareLock);
     596              : 
     597          301 :     return has_subtables;
     598              : }
     599              : 
     600              : /*
     601              :  * Get the relations for the subscription.
     602              :  *
     603              :  * If not_ready is true, return only the relations that are not in a ready
     604              :  * state, otherwise return all the relations of the subscription.  The
     605              :  * returned list is palloc'ed in the current memory context.
     606              :  */
     607              : List *
     608         1226 : GetSubscriptionRelations(Oid subid, bool tables, bool sequences,
     609              :                          bool not_ready)
     610              : {
     611         1226 :     List       *res = NIL;
     612              :     Relation    rel;
     613              :     HeapTuple   tup;
     614         1226 :     int         nkeys = 0;
     615              :     ScanKeyData skey[2];
     616              :     SysScanDesc scan;
     617              : 
     618              :     /* One or both of 'tables' and 'sequences' must be true. */
     619              :     Assert(tables || sequences);
     620              : 
     621         1226 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     622              : 
     623         1226 :     ScanKeyInit(&skey[nkeys++],
     624              :                 Anum_pg_subscription_rel_srsubid,
     625              :                 BTEqualStrategyNumber, F_OIDEQ,
     626              :                 ObjectIdGetDatum(subid));
     627              : 
     628         1226 :     if (not_ready)
     629         1186 :         ScanKeyInit(&skey[nkeys++],
     630              :                     Anum_pg_subscription_rel_srsubstate,
     631              :                     BTEqualStrategyNumber, F_CHARNE,
     632              :                     CharGetDatum(SUBREL_STATE_READY));
     633              : 
     634         1226 :     scan = systable_beginscan(rel, InvalidOid, false,
     635              :                               NULL, nkeys, skey);
     636              : 
     637         3177 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     638              :     {
     639              :         Form_pg_subscription_rel subrel;
     640              :         SubscriptionRelState *relstate;
     641              :         Datum       d;
     642              :         bool        isnull;
     643              :         char        relkind;
     644              : 
     645         1951 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     646              : 
     647              :         /* Relation is either a sequence or a table */
     648         1951 :         relkind = get_rel_relkind(subrel->srrelid);
     649              :         Assert(relkind == RELKIND_SEQUENCE || relkind == RELKIND_RELATION ||
     650              :                relkind == RELKIND_PARTITIONED_TABLE);
     651              : 
     652              :         /* Skip sequences if they were not requested */
     653         1951 :         if ((relkind == RELKIND_SEQUENCE) && !sequences)
     654            0 :             continue;
     655              : 
     656              :         /* Skip tables if they were not requested */
     657         1951 :         if ((relkind == RELKIND_RELATION ||
     658         1911 :              relkind == RELKIND_PARTITIONED_TABLE) && !tables)
     659            0 :             continue;
     660              : 
     661         1951 :         relstate = palloc_object(SubscriptionRelState);
     662         1951 :         relstate->relid = subrel->srrelid;
     663         1951 :         relstate->state = subrel->srsubstate;
     664         1951 :         d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     665              :                             Anum_pg_subscription_rel_srsublsn, &isnull);
     666         1951 :         if (isnull)
     667         1594 :             relstate->lsn = InvalidXLogRecPtr;
     668              :         else
     669          357 :             relstate->lsn = DatumGetLSN(d);
     670              : 
     671         1951 :         res = lappend(res, relstate);
     672              :     }
     673              : 
     674              :     /* Cleanup */
     675         1226 :     systable_endscan(scan);
     676         1226 :     table_close(rel, AccessShareLock);
     677              : 
     678         1226 :     return res;
     679              : }
     680              : 
     681              : /*
     682              :  * Update the dead tuple retention status for the given subscription.
     683              :  */
     684              : void
     685            2 : UpdateDeadTupleRetentionStatus(Oid subid, bool active)
     686              : {
     687              :     Relation    rel;
     688              :     bool        nulls[Natts_pg_subscription];
     689              :     bool        replaces[Natts_pg_subscription];
     690              :     Datum       values[Natts_pg_subscription];
     691              :     HeapTuple   tup;
     692              : 
     693              :     /* Look up the subscription in the catalog */
     694            2 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     695            2 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
     696              : 
     697            2 :     if (!HeapTupleIsValid(tup))
     698            0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
     699              : 
     700            2 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     701              : 
     702              :     /* Form a new tuple. */
     703            2 :     memset(values, 0, sizeof(values));
     704            2 :     memset(nulls, false, sizeof(nulls));
     705            2 :     memset(replaces, false, sizeof(replaces));
     706              : 
     707              :     /* Set the subscription to disabled. */
     708            2 :     values[Anum_pg_subscription_subretentionactive - 1] = BoolGetDatum(active);
     709            2 :     replaces[Anum_pg_subscription_subretentionactive - 1] = true;
     710              : 
     711              :     /* Update the catalog */
     712            2 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     713              :                             replaces);
     714            2 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     715            2 :     heap_freetuple(tup);
     716              : 
     717            2 :     table_close(rel, NoLock);
     718            2 : }
        

Generated by: LCOV version 2.0-1