LCOV - code coverage report
Current view: top level - src/backend/catalog - pg_subscription.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 94.9 % 235 223
Test Date: 2026-03-19 13:16:01 Functions: 100.0 % 13 13
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * pg_subscription.c
       4              :  *      replication subscriptions
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  * IDENTIFICATION
      10              :  *      src/backend/catalog/pg_subscription.c
      11              :  *
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : 
      15              : #include "postgres.h"
      16              : 
      17              : #include "access/genam.h"
      18              : #include "access/heapam.h"
      19              : #include "access/htup_details.h"
      20              : #include "access/tableam.h"
      21              : #include "catalog/indexing.h"
      22              : #include "catalog/pg_foreign_server.h"
      23              : #include "catalog/pg_subscription.h"
      24              : #include "catalog/pg_subscription_rel.h"
      25              : #include "catalog/pg_type.h"
      26              : #include "foreign/foreign.h"
      27              : #include "miscadmin.h"
      28              : #include "storage/lmgr.h"
      29              : #include "utils/acl.h"
      30              : #include "utils/array.h"
      31              : #include "utils/builtins.h"
      32              : #include "utils/fmgroids.h"
      33              : #include "utils/lsyscache.h"
      34              : #include "utils/pg_lsn.h"
      35              : #include "utils/rel.h"
      36              : #include "utils/syscache.h"
      37              : 
      38              : static List *textarray_to_stringlist(ArrayType *textarray);
      39              : 
      40              : /*
      41              :  * Add a comma-separated list of publication names to the 'dest' string.
      42              :  */
      43              : void
      44          536 : GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
      45              : {
      46              :     ListCell   *lc;
      47          536 :     bool        first = true;
      48              : 
      49              :     Assert(publications != NIL);
      50              : 
      51         1380 :     foreach(lc, publications)
      52              :     {
      53          844 :         char       *pubname = strVal(lfirst(lc));
      54              : 
      55          844 :         if (first)
      56          536 :             first = false;
      57              :         else
      58          308 :             appendStringInfoString(dest, ", ");
      59              : 
      60          844 :         if (quote_literal)
      61          834 :             appendStringInfoString(dest, quote_literal_cstr(pubname));
      62              :         else
      63              :         {
      64           10 :             appendStringInfoChar(dest, '"');
      65           10 :             appendStringInfoString(dest, pubname);
      66           10 :             appendStringInfoChar(dest, '"');
      67              :         }
      68              :     }
      69          536 : }
      70              : 
      71              : /*
      72              :  * Fetch the subscription from the syscache.
      73              :  */
      74              : Subscription *
      75          983 : GetSubscription(Oid subid, bool missing_ok, bool aclcheck)
      76              : {
      77              :     HeapTuple   tup;
      78              :     Subscription *sub;
      79              :     Form_pg_subscription subform;
      80              :     Datum       datum;
      81              :     bool        isnull;
      82              : 
      83          983 :     tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
      84              : 
      85          983 :     if (!HeapTupleIsValid(tup))
      86              :     {
      87           58 :         if (missing_ok)
      88           58 :             return NULL;
      89              : 
      90            0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
      91              :     }
      92              : 
      93          925 :     subform = (Form_pg_subscription) GETSTRUCT(tup);
      94              : 
      95          925 :     sub = palloc_object(Subscription);
      96          925 :     sub->oid = subid;
      97          925 :     sub->dbid = subform->subdbid;
      98          925 :     sub->skiplsn = subform->subskiplsn;
      99          925 :     sub->name = pstrdup(NameStr(subform->subname));
     100          925 :     sub->owner = subform->subowner;
     101          925 :     sub->enabled = subform->subenabled;
     102          925 :     sub->binary = subform->subbinary;
     103          925 :     sub->stream = subform->substream;
     104          925 :     sub->twophasestate = subform->subtwophasestate;
     105          925 :     sub->disableonerr = subform->subdisableonerr;
     106          925 :     sub->passwordrequired = subform->subpasswordrequired;
     107          925 :     sub->runasowner = subform->subrunasowner;
     108          925 :     sub->failover = subform->subfailover;
     109          925 :     sub->retaindeadtuples = subform->subretaindeadtuples;
     110          925 :     sub->maxretention = subform->submaxretention;
     111          925 :     sub->retentionactive = subform->subretentionactive;
     112              : 
     113              :     /* Get conninfo */
     114          925 :     if (OidIsValid(subform->subserver))
     115              :     {
     116              :         AclResult   aclresult;
     117              : 
     118              :         /* recheck ACL if requested */
     119           10 :         if (aclcheck)
     120              :         {
     121            5 :             aclresult = object_aclcheck(ForeignServerRelationId,
     122              :                                         subform->subserver,
     123              :                                         subform->subowner, ACL_USAGE);
     124              : 
     125            5 :             if (aclresult != ACLCHECK_OK)
     126            0 :                 ereport(ERROR,
     127              :                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     128              :                          errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
     129              :                                 GetUserNameFromId(subform->subowner, false),
     130              :                                 ForeignServerName(subform->subserver))));
     131              :         }
     132              : 
     133           10 :         sub->conninfo = ForeignServerConnectionString(subform->subowner,
     134              :                                                       subform->subserver);
     135              :     }
     136              :     else
     137              :     {
     138          915 :         datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     139              :                                        tup,
     140              :                                        Anum_pg_subscription_subconninfo);
     141          915 :         sub->conninfo = TextDatumGetCString(datum);
     142              :     }
     143              : 
     144              :     /* Get slotname */
     145          925 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID,
     146              :                             tup,
     147              :                             Anum_pg_subscription_subslotname,
     148              :                             &isnull);
     149          925 :     if (!isnull)
     150          881 :         sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
     151              :     else
     152           44 :         sub->slotname = NULL;
     153              : 
     154              :     /* Get synccommit */
     155          925 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     156              :                                    tup,
     157              :                                    Anum_pg_subscription_subsynccommit);
     158          925 :     sub->synccommit = TextDatumGetCString(datum);
     159              : 
     160              :     /* Get walrcvtimeout */
     161          925 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     162              :                                    tup,
     163              :                                    Anum_pg_subscription_subwalrcvtimeout);
     164          925 :     sub->walrcvtimeout = TextDatumGetCString(datum);
     165              : 
     166              :     /* Get publications */
     167          925 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     168              :                                    tup,
     169              :                                    Anum_pg_subscription_subpublications);
     170          925 :     sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
     171              : 
     172              :     /* Get origin */
     173          925 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     174              :                                    tup,
     175              :                                    Anum_pg_subscription_suborigin);
     176          925 :     sub->origin = TextDatumGetCString(datum);
     177              : 
     178              :     /* Is the subscription owner a superuser? */
     179          925 :     sub->ownersuperuser = superuser_arg(sub->owner);
     180              : 
     181          925 :     ReleaseSysCache(tup);
     182              : 
     183          925 :     return sub;
     184              : }
     185              : 
     186              : /*
     187              :  * Return number of subscriptions defined in given database.
     188              :  * Used by dropdb() to check if database can indeed be dropped.
     189              :  */
     190              : int
     191           51 : CountDBSubscriptions(Oid dbid)
     192              : {
     193           51 :     int         nsubs = 0;
     194              :     Relation    rel;
     195              :     ScanKeyData scankey;
     196              :     SysScanDesc scan;
     197              :     HeapTuple   tup;
     198              : 
     199           51 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     200              : 
     201           51 :     ScanKeyInit(&scankey,
     202              :                 Anum_pg_subscription_subdbid,
     203              :                 BTEqualStrategyNumber, F_OIDEQ,
     204              :                 ObjectIdGetDatum(dbid));
     205              : 
     206           51 :     scan = systable_beginscan(rel, InvalidOid, false,
     207              :                               NULL, 1, &scankey);
     208              : 
     209           51 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     210            0 :         nsubs++;
     211              : 
     212           51 :     systable_endscan(scan);
     213              : 
     214           51 :     table_close(rel, NoLock);
     215              : 
     216           51 :     return nsubs;
     217              : }
     218              : 
     219              : /*
     220              :  * Free memory allocated by subscription struct.
     221              :  */
     222              : void
     223           40 : FreeSubscription(Subscription *sub)
     224              : {
     225           40 :     pfree(sub->name);
     226           40 :     pfree(sub->conninfo);
     227           40 :     if (sub->slotname)
     228           40 :         pfree(sub->slotname);
     229           40 :     list_free_deep(sub->publications);
     230           40 :     pfree(sub);
     231           40 : }
     232              : 
     233              : /*
     234              :  * Disable the given subscription.
     235              :  */
     236              : void
     237            4 : DisableSubscription(Oid subid)
     238              : {
     239              :     Relation    rel;
     240              :     bool        nulls[Natts_pg_subscription];
     241              :     bool        replaces[Natts_pg_subscription];
     242              :     Datum       values[Natts_pg_subscription];
     243              :     HeapTuple   tup;
     244              : 
     245              :     /* Look up the subscription in the catalog */
     246            4 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     247            4 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
     248              : 
     249            4 :     if (!HeapTupleIsValid(tup))
     250            0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
     251              : 
     252            4 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     253              : 
     254              :     /* Form a new tuple. */
     255            4 :     memset(values, 0, sizeof(values));
     256            4 :     memset(nulls, false, sizeof(nulls));
     257            4 :     memset(replaces, false, sizeof(replaces));
     258              : 
     259              :     /* Set the subscription to disabled. */
     260            4 :     values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
     261            4 :     replaces[Anum_pg_subscription_subenabled - 1] = true;
     262              : 
     263              :     /* Update the catalog */
     264            4 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     265              :                             replaces);
     266            4 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     267            4 :     heap_freetuple(tup);
     268              : 
     269            4 :     table_close(rel, NoLock);
     270            4 : }
     271              : 
     272              : /*
     273              :  * Convert text array to list of strings.
     274              :  *
     275              :  * Note: the resulting list of strings is pallocated here.
     276              :  */
     277              : static List *
     278          925 : textarray_to_stringlist(ArrayType *textarray)
     279              : {
     280              :     Datum      *elems;
     281              :     int         nelems,
     282              :                 i;
     283          925 :     List       *res = NIL;
     284              : 
     285          925 :     deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
     286              : 
     287          925 :     if (nelems == 0)
     288            0 :         return NIL;
     289              : 
     290         2273 :     for (i = 0; i < nelems; i++)
     291         1348 :         res = lappend(res, makeString(TextDatumGetCString(elems[i])));
     292              : 
     293          925 :     return res;
     294              : }
     295              : 
     296              : /*
     297              :  * Add new state record for a subscription table.
     298              :  *
     299              :  * If retain_lock is true, then don't release the locks taken in this function.
     300              :  * We normally release the locks at the end of transaction but in binary-upgrade
     301              :  * mode, we expect to release those immediately.
     302              :  */
     303              : void
     304          223 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
     305              :                         XLogRecPtr sublsn, bool retain_lock)
     306              : {
     307              :     Relation    rel;
     308              :     HeapTuple   tup;
     309              :     bool        nulls[Natts_pg_subscription_rel];
     310              :     Datum       values[Natts_pg_subscription_rel];
     311              : 
     312          223 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     313              : 
     314          223 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     315              : 
     316              :     /* Try finding existing mapping. */
     317          223 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     318              :                               ObjectIdGetDatum(relid),
     319              :                               ObjectIdGetDatum(subid));
     320          223 :     if (HeapTupleIsValid(tup))
     321            0 :         elog(ERROR, "subscription relation %u in subscription %u already exists",
     322              :              relid, subid);
     323              : 
     324              :     /* Form the tuple. */
     325          223 :     memset(values, 0, sizeof(values));
     326          223 :     memset(nulls, false, sizeof(nulls));
     327          223 :     values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
     328          223 :     values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
     329          223 :     values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     330          223 :     if (XLogRecPtrIsValid(sublsn))
     331            2 :         values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     332              :     else
     333          221 :         nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     334              : 
     335          223 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     336              : 
     337              :     /* Insert tuple into catalog. */
     338          223 :     CatalogTupleInsert(rel, tup);
     339              : 
     340          223 :     heap_freetuple(tup);
     341              : 
     342              :     /* Cleanup. */
     343          223 :     if (retain_lock)
     344              :     {
     345          220 :         table_close(rel, NoLock);
     346              :     }
     347              :     else
     348              :     {
     349            3 :         table_close(rel, RowExclusiveLock);
     350            3 :         UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     351              :     }
     352          223 : }
     353              : 
     354              : /*
     355              :  * Update the state of a subscription table.
     356              :  */
     357              : void
     358          796 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
     359              :                            XLogRecPtr sublsn, bool already_locked)
     360              : {
     361              :     Relation    rel;
     362              :     HeapTuple   tup;
     363              :     bool        nulls[Natts_pg_subscription_rel];
     364              :     Datum       values[Natts_pg_subscription_rel];
     365              :     bool        replaces[Natts_pg_subscription_rel];
     366              : 
     367          796 :     if (already_locked)
     368              :     {
     369              : #ifdef USE_ASSERT_CHECKING
     370              :         LOCKTAG     tag;
     371              : 
     372              :         Assert(CheckRelationOidLockedByMe(SubscriptionRelRelationId,
     373              :                                           RowExclusiveLock, true));
     374              :         SET_LOCKTAG_OBJECT(tag, InvalidOid, SubscriptionRelationId, subid, 0);
     375              :         Assert(LockHeldByMe(&tag, AccessShareLock, true));
     376              : #endif
     377              : 
     378          190 :         rel = table_open(SubscriptionRelRelationId, NoLock);
     379              :     }
     380              :     else
     381              :     {
     382          606 :         LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     383          606 :         rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     384              :     }
     385              : 
     386              :     /* Try finding existing mapping. */
     387          796 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     388              :                               ObjectIdGetDatum(relid),
     389              :                               ObjectIdGetDatum(subid));
     390          796 :     if (!HeapTupleIsValid(tup))
     391            0 :         elog(ERROR, "subscription relation %u in subscription %u does not exist",
     392              :              relid, subid);
     393              : 
     394              :     /* Update the tuple. */
     395          796 :     memset(values, 0, sizeof(values));
     396          796 :     memset(nulls, false, sizeof(nulls));
     397          796 :     memset(replaces, false, sizeof(replaces));
     398              : 
     399          796 :     replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
     400          796 :     values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     401              : 
     402          796 :     replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
     403          796 :     if (XLogRecPtrIsValid(sublsn))
     404          392 :         values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     405              :     else
     406          404 :         nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     407              : 
     408          796 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     409              :                             replaces);
     410              : 
     411              :     /* Update the catalog. */
     412          796 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     413              : 
     414              :     /* Cleanup. */
     415          796 :     table_close(rel, NoLock);
     416          796 : }
     417              : 
     418              : /*
     419              :  * Get state of subscription table.
     420              :  *
     421              :  * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
     422              :  */
     423              : char
     424         1261 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
     425              : {
     426              :     HeapTuple   tup;
     427              :     char        substate;
     428              :     bool        isnull;
     429              :     Datum       d;
     430              :     Relation    rel;
     431              : 
     432              :     /*
     433              :      * This is to avoid the race condition with AlterSubscription which tries
     434              :      * to remove this relstate.
     435              :      */
     436         1261 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     437              : 
     438              :     /* Try finding the mapping. */
     439         1261 :     tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
     440              :                           ObjectIdGetDatum(relid),
     441              :                           ObjectIdGetDatum(subid));
     442              : 
     443         1261 :     if (!HeapTupleIsValid(tup))
     444              :     {
     445           24 :         table_close(rel, AccessShareLock);
     446           24 :         *sublsn = InvalidXLogRecPtr;
     447           24 :         return SUBREL_STATE_UNKNOWN;
     448              :     }
     449              : 
     450              :     /* Get the state. */
     451         1237 :     substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
     452              : 
     453              :     /* Get the LSN */
     454         1237 :     d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     455              :                         Anum_pg_subscription_rel_srsublsn, &isnull);
     456         1237 :     if (isnull)
     457          675 :         *sublsn = InvalidXLogRecPtr;
     458              :     else
     459          562 :         *sublsn = DatumGetLSN(d);
     460              : 
     461              :     /* Cleanup */
     462         1237 :     ReleaseSysCache(tup);
     463              : 
     464         1237 :     table_close(rel, AccessShareLock);
     465              : 
     466         1237 :     return substate;
     467              : }
     468              : 
     469              : /*
     470              :  * Drop subscription relation mapping. These can be for a particular
     471              :  * subscription, or for a particular relation, or both.
     472              :  */
     473              : void
     474        32043 : RemoveSubscriptionRel(Oid subid, Oid relid)
     475              : {
     476              :     Relation    rel;
     477              :     TableScanDesc scan;
     478              :     ScanKeyData skey[2];
     479              :     HeapTuple   tup;
     480        32043 :     int         nkeys = 0;
     481              : 
     482        32043 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     483              : 
     484        32043 :     if (OidIsValid(subid))
     485              :     {
     486          163 :         ScanKeyInit(&skey[nkeys++],
     487              :                     Anum_pg_subscription_rel_srsubid,
     488              :                     BTEqualStrategyNumber,
     489              :                     F_OIDEQ,
     490              :                     ObjectIdGetDatum(subid));
     491              :     }
     492              : 
     493        32043 :     if (OidIsValid(relid))
     494              :     {
     495        31901 :         ScanKeyInit(&skey[nkeys++],
     496              :                     Anum_pg_subscription_rel_srrelid,
     497              :                     BTEqualStrategyNumber,
     498              :                     F_OIDEQ,
     499              :                     ObjectIdGetDatum(relid));
     500              :     }
     501              : 
     502              :     /* Do the search and delete what we found. */
     503        32043 :     scan = table_beginscan_catalog(rel, nkeys, skey);
     504        32168 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     505              :     {
     506              :         Form_pg_subscription_rel subrel;
     507              : 
     508          125 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     509              : 
     510              :         /*
     511              :          * We don't allow to drop the relation mapping when the table
     512              :          * synchronization is in progress unless the caller updates the
     513              :          * corresponding subscription as well. This is to ensure that we don't
     514              :          * leave tablesync slots or origins in the system when the
     515              :          * corresponding table is dropped. For sequences, however, it's ok to
     516              :          * drop them since no separate slots or origins are created during
     517              :          * synchronization.
     518              :          */
     519          125 :         if (!OidIsValid(subid) &&
     520           16 :             subrel->srsubstate != SUBREL_STATE_READY &&
     521            0 :             get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE)
     522              :         {
     523            0 :             ereport(ERROR,
     524              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     525              :                      errmsg("could not drop relation mapping for subscription \"%s\"",
     526              :                             get_subscription_name(subrel->srsubid, false)),
     527              :                      errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
     528              :                                get_rel_name(relid), subrel->srsubstate),
     529              : 
     530              :             /*
     531              :              * translator: first %s is a SQL ALTER command and second %s is a
     532              :              * SQL DROP command
     533              :              */
     534              :                      errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
     535              :                              "ALTER SUBSCRIPTION ... ENABLE",
     536              :                              "DROP SUBSCRIPTION ...")));
     537              :         }
     538              : 
     539          125 :         CatalogTupleDelete(rel, &tup->t_self);
     540              :     }
     541        32043 :     table_endscan(scan);
     542              : 
     543        32043 :     table_close(rel, RowExclusiveLock);
     544        32043 : }
     545              : 
     546              : /*
     547              :  * Does the subscription have any tables?
     548              :  *
     549              :  * Use this function only to know true/false, and when you have no need for the
     550              :  * List returned by GetSubscriptionRelations.
     551              :  */
     552              : bool
     553          279 : HasSubscriptionTables(Oid subid)
     554              : {
     555              :     Relation    rel;
     556              :     ScanKeyData skey[1];
     557              :     SysScanDesc scan;
     558              :     HeapTuple   tup;
     559          279 :     bool        has_subtables = false;
     560              : 
     561          279 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     562              : 
     563          279 :     ScanKeyInit(&skey[0],
     564              :                 Anum_pg_subscription_rel_srsubid,
     565              :                 BTEqualStrategyNumber, F_OIDEQ,
     566              :                 ObjectIdGetDatum(subid));
     567              : 
     568          279 :     scan = systable_beginscan(rel, InvalidOid, false,
     569              :                               NULL, 1, skey);
     570              : 
     571          317 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     572              :     {
     573              :         Form_pg_subscription_rel subrel;
     574              :         char        relkind;
     575              : 
     576          302 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     577          302 :         relkind = get_rel_relkind(subrel->srrelid);
     578              : 
     579          302 :         if (relkind == RELKIND_RELATION ||
     580              :             relkind == RELKIND_PARTITIONED_TABLE)
     581              :         {
     582          264 :             has_subtables = true;
     583          264 :             break;
     584              :         }
     585              :     }
     586              : 
     587              :     /* Cleanup */
     588          279 :     systable_endscan(scan);
     589          279 :     table_close(rel, AccessShareLock);
     590              : 
     591          279 :     return has_subtables;
     592              : }
     593              : 
     594              : /*
     595              :  * Get the relations for the subscription.
     596              :  *
     597              :  * If not_ready is true, return only the relations that are not in a ready
     598              :  * state, otherwise return all the relations of the subscription.  The
     599              :  * returned list is palloc'ed in the current memory context.
     600              :  */
     601              : List *
     602         1185 : GetSubscriptionRelations(Oid subid, bool tables, bool sequences,
     603              :                          bool not_ready)
     604              : {
     605         1185 :     List       *res = NIL;
     606              :     Relation    rel;
     607              :     HeapTuple   tup;
     608         1185 :     int         nkeys = 0;
     609              :     ScanKeyData skey[2];
     610              :     SysScanDesc scan;
     611              : 
     612              :     /* One or both of 'tables' and 'sequences' must be true. */
     613              :     Assert(tables || sequences);
     614              : 
     615         1185 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     616              : 
     617         1185 :     ScanKeyInit(&skey[nkeys++],
     618              :                 Anum_pg_subscription_rel_srsubid,
     619              :                 BTEqualStrategyNumber, F_OIDEQ,
     620              :                 ObjectIdGetDatum(subid));
     621              : 
     622         1185 :     if (not_ready)
     623         1147 :         ScanKeyInit(&skey[nkeys++],
     624              :                     Anum_pg_subscription_rel_srsubstate,
     625              :                     BTEqualStrategyNumber, F_CHARNE,
     626              :                     CharGetDatum(SUBREL_STATE_READY));
     627              : 
     628         1185 :     scan = systable_beginscan(rel, InvalidOid, false,
     629              :                               NULL, nkeys, skey);
     630              : 
     631         3103 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     632              :     {
     633              :         Form_pg_subscription_rel subrel;
     634              :         SubscriptionRelState *relstate;
     635              :         Datum       d;
     636              :         bool        isnull;
     637              :         char        relkind;
     638              : 
     639         1918 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     640              : 
     641              :         /* Relation is either a sequence or a table */
     642         1918 :         relkind = get_rel_relkind(subrel->srrelid);
     643              :         Assert(relkind == RELKIND_SEQUENCE || relkind == RELKIND_RELATION ||
     644              :                relkind == RELKIND_PARTITIONED_TABLE);
     645              : 
     646              :         /* Skip sequences if they were not requested */
     647         1918 :         if ((relkind == RELKIND_SEQUENCE) && !sequences)
     648            0 :             continue;
     649              : 
     650              :         /* Skip tables if they were not requested */
     651         1918 :         if ((relkind == RELKIND_RELATION ||
     652         1890 :              relkind == RELKIND_PARTITIONED_TABLE) && !tables)
     653            0 :             continue;
     654              : 
     655         1918 :         relstate = palloc_object(SubscriptionRelState);
     656         1918 :         relstate->relid = subrel->srrelid;
     657         1918 :         relstate->state = subrel->srsubstate;
     658         1918 :         d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     659              :                             Anum_pg_subscription_rel_srsublsn, &isnull);
     660         1918 :         if (isnull)
     661         1580 :             relstate->lsn = InvalidXLogRecPtr;
     662              :         else
     663          338 :             relstate->lsn = DatumGetLSN(d);
     664              : 
     665         1918 :         res = lappend(res, relstate);
     666              :     }
     667              : 
     668              :     /* Cleanup */
     669         1185 :     systable_endscan(scan);
     670         1185 :     table_close(rel, AccessShareLock);
     671              : 
     672         1185 :     return res;
     673              : }
     674              : 
     675              : /*
     676              :  * Update the dead tuple retention status for the given subscription.
     677              :  */
     678              : void
     679            2 : UpdateDeadTupleRetentionStatus(Oid subid, bool active)
     680              : {
     681              :     Relation    rel;
     682              :     bool        nulls[Natts_pg_subscription];
     683              :     bool        replaces[Natts_pg_subscription];
     684              :     Datum       values[Natts_pg_subscription];
     685              :     HeapTuple   tup;
     686              : 
     687              :     /* Look up the subscription in the catalog */
     688            2 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     689            2 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
     690              : 
     691            2 :     if (!HeapTupleIsValid(tup))
     692            0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
     693              : 
     694            2 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     695              : 
     696              :     /* Form a new tuple. */
     697            2 :     memset(values, 0, sizeof(values));
     698            2 :     memset(nulls, false, sizeof(nulls));
     699            2 :     memset(replaces, false, sizeof(replaces));
     700              : 
     701              :     /* Set the subscription to disabled. */
     702            2 :     values[Anum_pg_subscription_subretentionactive - 1] = active;
     703            2 :     replaces[Anum_pg_subscription_subretentionactive - 1] = true;
     704              : 
     705              :     /* Update the catalog */
     706            2 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     707              :                             replaces);
     708            2 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     709            2 :     heap_freetuple(tup);
     710              : 
     711            2 :     table_close(rel, NoLock);
     712            2 : }
        

Generated by: LCOV version 2.0-1