LCOV - code coverage report
Current view: top level - src/backend/catalog - pg_subscription.c (source / functions) Coverage Total Hit
Test: PostgreSQL 20devel Lines: 94.9 % 236 224
Test Date: 2026-07-03 19:57:34 Functions: 100.0 % 12 12
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 71.0 % 100 71

             Branch data     Line data    Source code
       1                 :             : /*-------------------------------------------------------------------------
       2                 :             :  *
       3                 :             :  * pg_subscription.c
       4                 :             :  *      replication subscriptions
       5                 :             :  *
       6                 :             :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7                 :             :  * Portions Copyright (c) 1994, Regents of the University of California
       8                 :             :  *
       9                 :             :  * IDENTIFICATION
      10                 :             :  *      src/backend/catalog/pg_subscription.c
      11                 :             :  *
      12                 :             :  *-------------------------------------------------------------------------
      13                 :             :  */
      14                 :             : 
      15                 :             : #include "postgres.h"
      16                 :             : 
      17                 :             : #include "access/genam.h"
      18                 :             : #include "access/heapam.h"
      19                 :             : #include "access/htup_details.h"
      20                 :             : #include "access/tableam.h"
      21                 :             : #include "catalog/indexing.h"
      22                 :             : #include "catalog/pg_foreign_server.h"
      23                 :             : #include "catalog/pg_subscription.h"
      24                 :             : #include "catalog/pg_subscription_rel.h"
      25                 :             : #include "catalog/pg_type.h"
      26                 :             : #include "foreign/foreign.h"
      27                 :             : #include "miscadmin.h"
      28                 :             : #include "storage/lmgr.h"
      29                 :             : #include "storage/lock.h"
      30                 :             : #include "utils/acl.h"
      31                 :             : #include "utils/array.h"
      32                 :             : #include "utils/builtins.h"
      33                 :             : #include "utils/fmgroids.h"
      34                 :             : #include "utils/lsyscache.h"
      35                 :             : #include "utils/memutils.h"
      36                 :             : #include "utils/pg_lsn.h"
      37                 :             : #include "utils/rel.h"
      38                 :             : #include "utils/syscache.h"
      39                 :             : 
      40                 :             : static List *textarray_to_stringlist(ArrayType *textarray);
      41                 :             : 
      42                 :             : /*
      43                 :             :  * Add a comma-separated list of publication names to the 'dest' string.
      44                 :             :  *
      45                 :             :  * If quote_literal is true, the returned list can be used to construct an SQL
      46                 :             :  * command, thus no translation is applied.  Otherwise, the string can be used
      47                 :             :  * to create a user-facing message, so translatable quote marks are added.
      48                 :             :  */
      49                 :             : void
      50                 :         543 : GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
      51                 :             : {
      52                 :             :     ListCell   *lc;
      53                 :         543 :     bool        first = true;
      54                 :             : 
      55                 :             :     Assert(publications != NIL);
      56                 :             : 
      57   [ +  -  +  +  :        1394 :     foreach(lc, publications)
                   +  + ]
      58                 :             :     {
      59                 :         851 :         char       *pubname = strVal(lfirst(lc));
      60                 :             : 
      61         [ +  + ]:         851 :         if (quote_literal)
      62                 :             :         {
      63         [ +  + ]:         841 :             if (!first)
      64                 :         307 :                 appendStringInfoString(dest, ", ");
      65                 :         841 :             appendStringInfoString(dest, quote_literal_cstr(pubname));
      66                 :             :         }
      67                 :             :         else
      68                 :             :         {
      69         [ +  + ]:          10 :             if (first)
      70                 :           9 :                 appendStringInfo(dest, _("\"%s\""), pubname);
      71                 :             :             else
      72                 :           1 :                 appendStringInfo(dest, _(", \"%s\""), pubname);
      73                 :             :         }
      74                 :             : 
      75                 :         851 :         first = false;
      76                 :             :     }
      77                 :         543 : }
      78                 :             : 
      79                 :             : /*
      80                 :             :  * Fetch the subscription from the syscache.
      81                 :             :  *
      82                 :             :  * If conninfo_needed is true, conninfo will be constructed, possibly
      83                 :             :  * encountering errors in ForeignServerConnectionString(). Callers not
      84                 :             :  * expecting such errors should pass false, in which case conninfo will be
      85                 :             :  * NULL.
      86                 :             :  */
      87                 :             : Subscription *
      88                 :        1057 : GetSubscription(Oid subid, bool missing_ok, bool conninfo_needed,
      89                 :             :                 bool conninfo_aclcheck)
      90                 :             : {
      91                 :             :     HeapTuple   tup;
      92                 :             :     Subscription *sub;
      93                 :             :     Form_pg_subscription subform;
      94                 :             :     Datum       datum;
      95                 :             :     bool        isnull;
      96                 :             :     MemoryContext cxt;
      97                 :             :     MemoryContext oldcxt;
      98                 :             : 
      99                 :             :     Assert(conninfo_needed || !conninfo_aclcheck);
     100                 :             : 
     101                 :        1057 :     tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
     102                 :             : 
     103         [ +  + ]:        1057 :     if (!HeapTupleIsValid(tup))
     104                 :             :     {
     105         [ +  - ]:          66 :         if (missing_ok)
     106                 :          66 :             return NULL;
     107                 :             : 
     108         [ #  # ]:           0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
     109                 :             :     }
     110                 :             : 
     111                 :         991 :     cxt = AllocSetContextCreate(CurrentMemoryContext, "subscription",
     112                 :             :                                 ALLOCSET_SMALL_SIZES);
     113                 :         991 :     oldcxt = MemoryContextSwitchTo(cxt);
     114                 :             : 
     115                 :         991 :     subform = (Form_pg_subscription) GETSTRUCT(tup);
     116                 :             : 
     117                 :         991 :     sub = palloc0_object(Subscription);
     118                 :         991 :     sub->cxt = cxt;
     119                 :         991 :     sub->oid = subid;
     120                 :         991 :     sub->dbid = subform->subdbid;
     121                 :         991 :     sub->skiplsn = subform->subskiplsn;
     122                 :         991 :     sub->name = pstrdup(NameStr(subform->subname));
     123                 :         991 :     sub->owner = subform->subowner;
     124                 :         991 :     sub->enabled = subform->subenabled;
     125                 :         991 :     sub->binary = subform->subbinary;
     126                 :         991 :     sub->stream = subform->substream;
     127                 :         991 :     sub->twophasestate = subform->subtwophasestate;
     128                 :         991 :     sub->disableonerr = subform->subdisableonerr;
     129                 :         991 :     sub->passwordrequired = subform->subpasswordrequired;
     130                 :         991 :     sub->runasowner = subform->subrunasowner;
     131                 :         991 :     sub->failover = subform->subfailover;
     132                 :         991 :     sub->retaindeadtuples = subform->subretaindeadtuples;
     133                 :         991 :     sub->maxretention = subform->submaxretention;
     134                 :         991 :     sub->retentionactive = subform->subretentionactive;
     135                 :         991 :     sub->conflictlogrelid = subform->subconflictlogrelid;
     136                 :             : 
     137         [ +  + ]:         991 :     if (conninfo_needed)
     138                 :             :     {
     139         [ +  + ]:         855 :         if (OidIsValid(subform->subserver))
     140                 :             :         {
     141                 :             :             AclResult   aclresult;
     142                 :             :             ForeignServer *server;
     143                 :             : 
     144                 :           5 :             server = GetForeignServer(subform->subserver);
     145                 :             : 
     146         [ +  - ]:           5 :             if (conninfo_aclcheck)
     147                 :             :             {
     148                 :             :                 /* recheck ACL if requested */
     149                 :           5 :                 aclresult = object_aclcheck(ForeignServerRelationId,
     150                 :             :                                             subform->subserver,
     151                 :             :                                             subform->subowner, ACL_USAGE);
     152                 :             : 
     153         [ -  + ]:           5 :                 if (aclresult != ACLCHECK_OK)
     154         [ #  # ]:           0 :                     ereport(ERROR,
     155                 :             :                             (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     156                 :             :                              errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
     157                 :             :                                     GetUserNameFromId(subform->subowner, false),
     158                 :             :                                     server->servername)));
     159                 :             :             }
     160                 :             : 
     161                 :           5 :             sub->conninfo = ForeignServerConnectionString(subform->subowner,
     162                 :             :                                                           server);
     163                 :             :         }
     164                 :             :         else
     165                 :             :         {
     166                 :         850 :             datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     167                 :             :                                            tup,
     168                 :             :                                            Anum_pg_subscription_subconninfo);
     169                 :         850 :             sub->conninfo = TextDatumGetCString(datum);
     170                 :             :         }
     171                 :             :     }
     172                 :             : 
     173                 :             :     /* Get slotname */
     174                 :         991 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID,
     175                 :             :                             tup,
     176                 :             :                             Anum_pg_subscription_subslotname,
     177                 :             :                             &isnull);
     178         [ +  + ]:         991 :     if (!isnull)
     179                 :         947 :         sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
     180                 :             :     else
     181                 :          44 :         sub->slotname = NULL;
     182                 :             : 
     183                 :             :     /* Get synccommit */
     184                 :         991 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     185                 :             :                                    tup,
     186                 :             :                                    Anum_pg_subscription_subsynccommit);
     187                 :         991 :     sub->synccommit = TextDatumGetCString(datum);
     188                 :             : 
     189                 :             :     /* Get walrcvtimeout */
     190                 :         991 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     191                 :             :                                    tup,
     192                 :             :                                    Anum_pg_subscription_subwalrcvtimeout);
     193                 :         991 :     sub->walrcvtimeout = TextDatumGetCString(datum);
     194                 :             : 
     195                 :             :     /* Get publications */
     196                 :         991 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     197                 :             :                                    tup,
     198                 :             :                                    Anum_pg_subscription_subpublications);
     199                 :         991 :     sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
     200                 :             : 
     201                 :             :     /* Get origin */
     202                 :         991 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     203                 :             :                                    tup,
     204                 :             :                                    Anum_pg_subscription_suborigin);
     205                 :         991 :     sub->origin = TextDatumGetCString(datum);
     206                 :             : 
     207                 :             :     /* Get conflict log destination */
     208                 :         991 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     209                 :             :                                    tup,
     210                 :             :                                    Anum_pg_subscription_subconflictlogdest);
     211                 :         991 :     sub->conflictlogdest = TextDatumGetCString(datum);
     212                 :             : 
     213                 :             :     /* Is the subscription owner a superuser? */
     214                 :         991 :     sub->ownersuperuser = superuser_arg(sub->owner);
     215                 :             : 
     216                 :         991 :     ReleaseSysCache(tup);
     217                 :             : 
     218                 :         991 :     MemoryContextSwitchTo(oldcxt);
     219                 :             : 
     220                 :         991 :     return sub;
     221                 :             : }
     222                 :             : 
     223                 :             : /*
     224                 :             :  * Return number of subscriptions defined in given database.
     225                 :             :  * Used by dropdb() to check if database can indeed be dropped.
     226                 :             :  */
     227                 :             : int
     228                 :          50 : CountDBSubscriptions(Oid dbid)
     229                 :             : {
     230                 :          50 :     int         nsubs = 0;
     231                 :             :     Relation    rel;
     232                 :             :     ScanKeyData scankey;
     233                 :             :     SysScanDesc scan;
     234                 :             :     HeapTuple   tup;
     235                 :             : 
     236                 :          50 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     237                 :             : 
     238                 :          50 :     ScanKeyInit(&scankey,
     239                 :             :                 Anum_pg_subscription_subdbid,
     240                 :             :                 BTEqualStrategyNumber, F_OIDEQ,
     241                 :             :                 ObjectIdGetDatum(dbid));
     242                 :             : 
     243                 :          50 :     scan = systable_beginscan(rel, InvalidOid, false,
     244                 :             :                               NULL, 1, &scankey);
     245                 :             : 
     246         [ -  + ]:          50 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     247                 :           0 :         nsubs++;
     248                 :             : 
     249                 :          50 :     systable_endscan(scan);
     250                 :             : 
     251                 :          50 :     table_close(rel, NoLock);
     252                 :             : 
     253                 :          50 :     return nsubs;
     254                 :             : }
     255                 :             : 
     256                 :             : /*
     257                 :             :  * Disable the given subscription.
     258                 :             :  */
     259                 :             : void
     260                 :           4 : DisableSubscription(Oid subid)
     261                 :             : {
     262                 :             :     Relation    rel;
     263                 :             :     bool        nulls[Natts_pg_subscription];
     264                 :             :     bool        replaces[Natts_pg_subscription];
     265                 :             :     Datum       values[Natts_pg_subscription];
     266                 :             :     HeapTuple   tup;
     267                 :             : 
     268                 :             :     /* Look up the subscription in the catalog */
     269                 :           4 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     270                 :           4 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
     271                 :             : 
     272         [ -  + ]:           4 :     if (!HeapTupleIsValid(tup))
     273         [ #  # ]:           0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
     274                 :             : 
     275                 :           4 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     276                 :             : 
     277                 :             :     /* Form a new tuple. */
     278                 :           4 :     memset(values, 0, sizeof(values));
     279                 :           4 :     memset(nulls, false, sizeof(nulls));
     280                 :           4 :     memset(replaces, false, sizeof(replaces));
     281                 :             : 
     282                 :             :     /* Set the subscription to disabled. */
     283                 :           4 :     values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
     284                 :           4 :     replaces[Anum_pg_subscription_subenabled - 1] = true;
     285                 :             : 
     286                 :             :     /* Update the catalog */
     287                 :           4 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     288                 :             :                             replaces);
     289                 :           4 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     290                 :           4 :     heap_freetuple(tup);
     291                 :             : 
     292                 :           4 :     table_close(rel, NoLock);
     293                 :           4 : }
     294                 :             : 
     295                 :             : /*
     296                 :             :  * Convert text array to list of strings.
     297                 :             :  *
     298                 :             :  * Note: the resulting list of strings is pallocated here.
     299                 :             :  */
     300                 :             : static List *
     301                 :         991 : textarray_to_stringlist(ArrayType *textarray)
     302                 :             : {
     303                 :             :     Datum      *elems;
     304                 :             :     int         nelems,
     305                 :             :                 i;
     306                 :         991 :     List       *res = NIL;
     307                 :             : 
     308                 :         991 :     deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
     309                 :             : 
     310         [ -  + ]:         991 :     if (nelems == 0)
     311                 :           0 :         return NIL;
     312                 :             : 
     313         [ +  + ]:        2383 :     for (i = 0; i < nelems; i++)
     314                 :        1392 :         res = lappend(res, makeString(TextDatumGetCString(elems[i])));
     315                 :             : 
     316                 :         991 :     return res;
     317                 :             : }
     318                 :             : 
     319                 :             : /*
     320                 :             :  * Add new state record for a subscription table.
     321                 :             :  *
     322                 :             :  * If retain_lock is true, then don't release the locks taken in this function.
     323                 :             :  * We normally release the locks at the end of transaction but in binary-upgrade
     324                 :             :  * mode, we expect to release those immediately.
     325                 :             :  */
     326                 :             : void
     327                 :         228 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
     328                 :             :                         XLogRecPtr sublsn, bool retain_lock)
     329                 :             : {
     330                 :             :     Relation    rel;
     331                 :             :     HeapTuple   tup;
     332                 :             :     bool        nulls[Natts_pg_subscription_rel];
     333                 :             :     Datum       values[Natts_pg_subscription_rel];
     334                 :             : 
     335                 :         228 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     336                 :             : 
     337                 :         228 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     338                 :             : 
     339                 :             :     /* Try finding existing mapping. */
     340                 :         228 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     341                 :             :                               ObjectIdGetDatum(relid),
     342                 :             :                               ObjectIdGetDatum(subid));
     343         [ -  + ]:         228 :     if (HeapTupleIsValid(tup))
     344         [ #  # ]:           0 :         elog(ERROR, "subscription relation %u in subscription %u already exists",
     345                 :             :              relid, subid);
     346                 :             : 
     347                 :             :     /* Form the tuple. */
     348                 :         228 :     memset(values, 0, sizeof(values));
     349                 :         228 :     memset(nulls, false, sizeof(nulls));
     350                 :         228 :     values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
     351                 :         228 :     values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
     352                 :         228 :     values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     353         [ +  + ]:         228 :     if (XLogRecPtrIsValid(sublsn))
     354                 :           2 :         values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     355                 :             :     else
     356                 :         226 :         nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     357                 :             : 
     358                 :         228 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     359                 :             : 
     360                 :             :     /* Insert tuple into catalog. */
     361                 :         228 :     CatalogTupleInsert(rel, tup);
     362                 :             : 
     363                 :         228 :     heap_freetuple(tup);
     364                 :             : 
     365                 :             :     /* Cleanup. */
     366         [ +  + ]:         228 :     if (retain_lock)
     367                 :             :     {
     368                 :         225 :         table_close(rel, NoLock);
     369                 :             :     }
     370                 :             :     else
     371                 :             :     {
     372                 :           3 :         table_close(rel, RowExclusiveLock);
     373                 :           3 :         UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     374                 :             :     }
     375                 :         228 : }
     376                 :             : 
     377                 :             : /*
     378                 :             :  * Update the state of a subscription table.
     379                 :             :  */
     380                 :             : void
     381                 :         831 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
     382                 :             :                            XLogRecPtr sublsn, bool already_locked)
     383                 :             : {
     384                 :             :     Relation    rel;
     385                 :             :     HeapTuple   tup;
     386                 :             :     bool        nulls[Natts_pg_subscription_rel];
     387                 :             :     Datum       values[Natts_pg_subscription_rel];
     388                 :             :     bool        replaces[Natts_pg_subscription_rel];
     389                 :             : 
     390         [ +  + ]:         831 :     if (already_locked)
     391                 :             :     {
     392                 :             : #ifdef USE_ASSERT_CHECKING
     393                 :             :         LOCKTAG     tag;
     394                 :             : 
     395                 :             :         Assert(CheckRelationOidLockedByMe(SubscriptionRelRelationId,
     396                 :             :                                           RowExclusiveLock, true));
     397                 :             :         SET_LOCKTAG_OBJECT(tag, InvalidOid, SubscriptionRelationId, subid, 0);
     398                 :             :         Assert(LockHeldByMe(&tag, AccessShareLock, true));
     399                 :             : #endif
     400                 :             : 
     401                 :         193 :         rel = table_open(SubscriptionRelRelationId, NoLock);
     402                 :             :     }
     403                 :             :     else
     404                 :             :     {
     405                 :         638 :         LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     406                 :         638 :         rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     407                 :             :     }
     408                 :             : 
     409                 :             :     /* Try finding existing mapping. */
     410                 :         831 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     411                 :             :                               ObjectIdGetDatum(relid),
     412                 :             :                               ObjectIdGetDatum(subid));
     413         [ -  + ]:         831 :     if (!HeapTupleIsValid(tup))
     414         [ #  # ]:           0 :         elog(ERROR, "subscription relation %u in subscription %u does not exist",
     415                 :             :              relid, subid);
     416                 :             : 
     417                 :             :     /* Update the tuple. */
     418                 :         831 :     memset(values, 0, sizeof(values));
     419                 :         831 :     memset(nulls, false, sizeof(nulls));
     420                 :         831 :     memset(replaces, false, sizeof(replaces));
     421                 :             : 
     422                 :         831 :     replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
     423                 :         831 :     values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     424                 :             : 
     425                 :         831 :     replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
     426         [ +  + ]:         831 :     if (XLogRecPtrIsValid(sublsn))
     427                 :         408 :         values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     428                 :             :     else
     429                 :         423 :         nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     430                 :             : 
     431                 :         831 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     432                 :             :                             replaces);
     433                 :             : 
     434                 :             :     /* Update the catalog. */
     435                 :         831 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     436                 :             : 
     437                 :             :     /* Cleanup. */
     438                 :         831 :     table_close(rel, NoLock);
     439                 :         831 : }
     440                 :             : 
     441                 :             : /*
     442                 :             :  * Get state of subscription table.
     443                 :             :  *
     444                 :             :  * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
     445                 :             :  */
     446                 :             : char
     447                 :        1312 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
     448                 :             : {
     449                 :             :     HeapTuple   tup;
     450                 :             :     char        substate;
     451                 :             :     bool        isnull;
     452                 :             :     Datum       d;
     453                 :             :     Relation    rel;
     454                 :             : 
     455                 :             :     /*
     456                 :             :      * This is to avoid the race condition with AlterSubscription which tries
     457                 :             :      * to remove this relstate.
     458                 :             :      */
     459                 :        1312 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     460                 :             : 
     461                 :             :     /* Try finding the mapping. */
     462                 :        1312 :     tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
     463                 :             :                           ObjectIdGetDatum(relid),
     464                 :             :                           ObjectIdGetDatum(subid));
     465                 :             : 
     466         [ +  + ]:        1312 :     if (!HeapTupleIsValid(tup))
     467                 :             :     {
     468                 :          35 :         table_close(rel, AccessShareLock);
     469                 :          35 :         *sublsn = InvalidXLogRecPtr;
     470                 :          35 :         return SUBREL_STATE_UNKNOWN;
     471                 :             :     }
     472                 :             : 
     473                 :             :     /* Get the state. */
     474                 :        1277 :     substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
     475                 :             : 
     476                 :             :     /* Get the LSN */
     477                 :        1277 :     d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     478                 :             :                         Anum_pg_subscription_rel_srsublsn, &isnull);
     479         [ +  + ]:        1277 :     if (isnull)
     480                 :         706 :         *sublsn = InvalidXLogRecPtr;
     481                 :             :     else
     482                 :         571 :         *sublsn = DatumGetLSN(d);
     483                 :             : 
     484                 :             :     /* Cleanup */
     485                 :        1277 :     ReleaseSysCache(tup);
     486                 :             : 
     487                 :        1277 :     table_close(rel, AccessShareLock);
     488                 :             : 
     489                 :        1277 :     return substate;
     490                 :             : }
     491                 :             : 
     492                 :             : /*
     493                 :             :  * Drop subscription relation mapping. These can be for a particular
     494                 :             :  * subscription, or for a particular relation, or both.
     495                 :             :  */
     496                 :             : void
     497                 :       34165 : RemoveSubscriptionRel(Oid subid, Oid relid)
     498                 :             : {
     499                 :             :     Relation    rel;
     500                 :             :     TableScanDesc scan;
     501                 :             :     ScanKeyData skey[2];
     502                 :             :     HeapTuple   tup;
     503                 :       34165 :     int         nkeys = 0;
     504                 :             : 
     505                 :       34165 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     506                 :             : 
     507         [ +  + ]:       34165 :     if (OidIsValid(subid))
     508                 :             :     {
     509                 :         189 :         ScanKeyInit(&skey[nkeys++],
     510                 :             :                     Anum_pg_subscription_rel_srsubid,
     511                 :             :                     BTEqualStrategyNumber,
     512                 :             :                     F_OIDEQ,
     513                 :             :                     ObjectIdGetDatum(subid));
     514                 :             :     }
     515                 :             : 
     516         [ +  + ]:       34165 :     if (OidIsValid(relid))
     517                 :             :     {
     518                 :       33997 :         ScanKeyInit(&skey[nkeys++],
     519                 :             :                     Anum_pg_subscription_rel_srrelid,
     520                 :             :                     BTEqualStrategyNumber,
     521                 :             :                     F_OIDEQ,
     522                 :             :                     ObjectIdGetDatum(relid));
     523                 :             :     }
     524                 :             : 
     525                 :             :     /* Do the search and delete what we found. */
     526                 :       34165 :     scan = table_beginscan_catalog(rel, nkeys, skey);
     527         [ +  + ]:       34295 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     528                 :             :     {
     529                 :             :         Form_pg_subscription_rel subrel;
     530                 :             : 
     531                 :         130 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     532                 :             : 
     533                 :             :         /*
     534                 :             :          * We don't allow to drop the relation mapping when the table
     535                 :             :          * synchronization is in progress unless the caller updates the
     536                 :             :          * corresponding subscription as well. This is to ensure that we don't
     537                 :             :          * leave tablesync slots or origins in the system when the
     538                 :             :          * corresponding table is dropped. For sequences, however, it's ok to
     539                 :             :          * drop them since no separate slots or origins are created during
     540                 :             :          * synchronization.
     541                 :             :          */
     542         [ +  + ]:         130 :         if (!OidIsValid(subid) &&
     543   [ -  +  -  - ]:          16 :             subrel->srsubstate != SUBREL_STATE_READY &&
     544                 :           0 :             get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE)
     545                 :             :         {
     546         [ #  # ]:           0 :             ereport(ERROR,
     547                 :             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     548                 :             :                      errmsg("could not drop relation mapping for subscription \"%s\"",
     549                 :             :                             get_subscription_name(subrel->srsubid, false)),
     550                 :             :                      errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
     551                 :             :                                get_rel_name(relid), subrel->srsubstate),
     552                 :             : 
     553                 :             :             /*
     554                 :             :              * translator: first %s is a SQL ALTER command and second %s is a
     555                 :             :              * SQL DROP command
     556                 :             :              */
     557                 :             :                      errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
     558                 :             :                              "ALTER SUBSCRIPTION ... ENABLE",
     559                 :             :                              "DROP SUBSCRIPTION ...")));
     560                 :             :         }
     561                 :             : 
     562                 :         130 :         CatalogTupleDelete(rel, &tup->t_self);
     563                 :             :     }
     564                 :       34165 :     table_endscan(scan);
     565                 :             : 
     566                 :       34165 :     table_close(rel, RowExclusiveLock);
     567                 :       34165 : }
     568                 :             : 
     569                 :             : /*
     570                 :             :  * Does the subscription have any tables?
     571                 :             :  *
     572                 :             :  * Use this function only to know true/false, and when you have no need for the
     573                 :             :  * List returned by GetSubscriptionRelations.
     574                 :             :  */
     575                 :             : bool
     576                 :         271 : HasSubscriptionTables(Oid subid)
     577                 :             : {
     578                 :             :     Relation    rel;
     579                 :             :     ScanKeyData skey[1];
     580                 :             :     SysScanDesc scan;
     581                 :             :     HeapTuple   tup;
     582                 :         271 :     bool        has_subtables = false;
     583                 :             : 
     584                 :         271 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     585                 :             : 
     586                 :         271 :     ScanKeyInit(&skey[0],
     587                 :             :                 Anum_pg_subscription_rel_srsubid,
     588                 :             :                 BTEqualStrategyNumber, F_OIDEQ,
     589                 :             :                 ObjectIdGetDatum(subid));
     590                 :             : 
     591                 :         271 :     scan = systable_beginscan(rel, InvalidOid, false,
     592                 :             :                               NULL, 1, skey);
     593                 :             : 
     594         [ +  + ]:         325 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     595                 :             :     {
     596                 :             :         Form_pg_subscription_rel subrel;
     597                 :             :         char        relkind;
     598                 :             : 
     599                 :         307 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     600                 :         307 :         relkind = get_rel_relkind(subrel->srrelid);
     601                 :             : 
     602   [ +  +  +  + ]:         307 :         if (relkind == RELKIND_RELATION ||
     603                 :             :             relkind == RELKIND_PARTITIONED_TABLE)
     604                 :             :         {
     605                 :         253 :             has_subtables = true;
     606                 :         253 :             break;
     607                 :             :         }
     608                 :             :     }
     609                 :             : 
     610                 :             :     /* Cleanup */
     611                 :         271 :     systable_endscan(scan);
     612                 :         271 :     table_close(rel, AccessShareLock);
     613                 :             : 
     614                 :         271 :     return has_subtables;
     615                 :             : }
     616                 :             : 
     617                 :             : /*
     618                 :             :  * Get the relations for the subscription.
     619                 :             :  *
     620                 :             :  * If not_ready is true, return only the relations that are not in a ready
     621                 :             :  * state, otherwise return all the relations of the subscription.  The
     622                 :             :  * returned list is palloc'ed in the current memory context.
     623                 :             :  */
     624                 :             : List *
     625                 :        1229 : GetSubscriptionRelations(Oid subid, bool tables, bool sequences,
     626                 :             :                          bool not_ready)
     627                 :             : {
     628                 :        1229 :     List       *res = NIL;
     629                 :             :     Relation    rel;
     630                 :             :     HeapTuple   tup;
     631                 :        1229 :     int         nkeys = 0;
     632                 :             :     ScanKeyData skey[2];
     633                 :             :     SysScanDesc scan;
     634                 :             : 
     635                 :             :     /* One or both of 'tables' and 'sequences' must be true. */
     636                 :             :     Assert(tables || sequences);
     637                 :             : 
     638                 :        1229 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     639                 :             : 
     640                 :        1229 :     ScanKeyInit(&skey[nkeys++],
     641                 :             :                 Anum_pg_subscription_rel_srsubid,
     642                 :             :                 BTEqualStrategyNumber, F_OIDEQ,
     643                 :             :                 ObjectIdGetDatum(subid));
     644                 :             : 
     645         [ +  + ]:        1229 :     if (not_ready)
     646                 :        1188 :         ScanKeyInit(&skey[nkeys++],
     647                 :             :                     Anum_pg_subscription_rel_srsubstate,
     648                 :             :                     BTEqualStrategyNumber, F_CHARNE,
     649                 :             :                     CharGetDatum(SUBREL_STATE_READY));
     650                 :             : 
     651                 :        1229 :     scan = systable_beginscan(rel, InvalidOid, false,
     652                 :             :                               NULL, nkeys, skey);
     653                 :             : 
     654         [ +  + ]:        3227 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     655                 :             :     {
     656                 :             :         Form_pg_subscription_rel subrel;
     657                 :             :         SubscriptionRelState *relstate;
     658                 :             :         Datum       d;
     659                 :             :         bool        isnull;
     660                 :             :         char        relkind;
     661                 :             : 
     662                 :        1998 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     663                 :             : 
     664                 :             :         /* Relation is either a sequence or a table */
     665                 :        1998 :         relkind = get_rel_relkind(subrel->srrelid);
     666                 :             :         Assert(relkind == RELKIND_SEQUENCE || relkind == RELKIND_RELATION ||
     667                 :             :                relkind == RELKIND_PARTITIONED_TABLE);
     668                 :             : 
     669                 :             :         /* Skip sequences if they were not requested */
     670   [ +  +  -  + ]:        1998 :         if ((relkind == RELKIND_SEQUENCE) && !sequences)
     671                 :           0 :             continue;
     672                 :             : 
     673                 :             :         /* Skip tables if they were not requested */
     674   [ +  +  +  + ]:        1998 :         if ((relkind == RELKIND_RELATION ||
     675         [ -  + ]:        1950 :              relkind == RELKIND_PARTITIONED_TABLE) && !tables)
     676                 :           0 :             continue;
     677                 :             : 
     678                 :        1998 :         relstate = palloc_object(SubscriptionRelState);
     679                 :        1998 :         relstate->relid = subrel->srrelid;
     680                 :        1998 :         relstate->state = subrel->srsubstate;
     681                 :        1998 :         d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     682                 :             :                             Anum_pg_subscription_rel_srsublsn, &isnull);
     683         [ +  + ]:        1998 :         if (isnull)
     684                 :        1628 :             relstate->lsn = InvalidXLogRecPtr;
     685                 :             :         else
     686                 :         370 :             relstate->lsn = DatumGetLSN(d);
     687                 :             : 
     688                 :        1998 :         res = lappend(res, relstate);
     689                 :             :     }
     690                 :             : 
     691                 :             :     /* Cleanup */
     692                 :        1229 :     systable_endscan(scan);
     693                 :        1229 :     table_close(rel, AccessShareLock);
     694                 :             : 
     695                 :        1229 :     return res;
     696                 :             : }
     697                 :             : 
     698                 :             : /*
     699                 :             :  * Update the dead tuple retention status for the given subscription.
     700                 :             :  */
     701                 :             : void
     702                 :           2 : UpdateDeadTupleRetentionStatus(Oid subid, bool active)
     703                 :             : {
     704                 :             :     Relation    rel;
     705                 :             :     bool        nulls[Natts_pg_subscription];
     706                 :             :     bool        replaces[Natts_pg_subscription];
     707                 :             :     Datum       values[Natts_pg_subscription];
     708                 :             :     HeapTuple   tup;
     709                 :             : 
     710                 :             :     /* Look up the subscription in the catalog */
     711                 :           2 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     712                 :           2 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
     713                 :             : 
     714         [ -  + ]:           2 :     if (!HeapTupleIsValid(tup))
     715         [ #  # ]:           0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
     716                 :             : 
     717                 :           2 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     718                 :             : 
     719                 :             :     /* Form a new tuple. */
     720                 :           2 :     memset(values, 0, sizeof(values));
     721                 :           2 :     memset(nulls, false, sizeof(nulls));
     722                 :           2 :     memset(replaces, false, sizeof(replaces));
     723                 :             : 
     724                 :             :     /* Set the subscription to disabled. */
     725                 :           2 :     values[Anum_pg_subscription_subretentionactive - 1] = BoolGetDatum(active);
     726                 :           2 :     replaces[Anum_pg_subscription_subretentionactive - 1] = true;
     727                 :             : 
     728                 :             :     /* Update the catalog */
     729                 :           2 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     730                 :             :                             replaces);
     731                 :           2 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     732                 :           2 :     heap_freetuple(tup);
     733                 :             : 
     734                 :           2 :     table_close(rel, NoLock);
     735                 :           2 : }
        

Generated by: LCOV version 2.0-1