LCOV - code coverage report
Current view: top level - src/backend/catalog - pg_subscription.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 165 175 94.3 %
Date: 2023-12-11 16:10:55 Functions: 11 11 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_subscription.c
       4             :  *      replication subscriptions
       5             :  *
       6             :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *      src/backend/catalog/pg_subscription.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/genam.h"
      18             : #include "access/heapam.h"
      19             : #include "access/htup_details.h"
      20             : #include "access/tableam.h"
      21             : #include "access/xact.h"
      22             : #include "catalog/indexing.h"
      23             : #include "catalog/pg_subscription.h"
      24             : #include "catalog/pg_subscription_rel.h"
      25             : #include "catalog/pg_type.h"
      26             : #include "miscadmin.h"
      27             : #include "nodes/makefuncs.h"
      28             : #include "storage/lmgr.h"
      29             : #include "utils/array.h"
      30             : #include "utils/builtins.h"
      31             : #include "utils/fmgroids.h"
      32             : #include "utils/lsyscache.h"
      33             : #include "utils/pg_lsn.h"
      34             : #include "utils/rel.h"
      35             : #include "utils/syscache.h"
      36             : 
      37             : static List *textarray_to_stringlist(ArrayType *textarray);
      38             : 
      39             : /*
      40             :  * Fetch the subscription from the syscache.
      41             :  */
      42             : Subscription *
      43        1226 : GetSubscription(Oid subid, bool missing_ok)
      44             : {
      45             :     HeapTuple   tup;
      46             :     Subscription *sub;
      47             :     Form_pg_subscription subform;
      48             :     Datum       datum;
      49             :     bool        isnull;
      50             : 
      51        1226 :     tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
      52             : 
      53        1226 :     if (!HeapTupleIsValid(tup))
      54             :     {
      55           0 :         if (missing_ok)
      56           0 :             return NULL;
      57             : 
      58           0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
      59             :     }
      60             : 
      61        1226 :     subform = (Form_pg_subscription) GETSTRUCT(tup);
      62             : 
      63        1226 :     sub = (Subscription *) palloc(sizeof(Subscription));
      64        1226 :     sub->oid = subid;
      65        1226 :     sub->dbid = subform->subdbid;
      66        1226 :     sub->skiplsn = subform->subskiplsn;
      67        1226 :     sub->name = pstrdup(NameStr(subform->subname));
      68        1226 :     sub->owner = subform->subowner;
      69        1226 :     sub->enabled = subform->subenabled;
      70        1226 :     sub->binary = subform->subbinary;
      71        1226 :     sub->stream = subform->substream;
      72        1226 :     sub->twophasestate = subform->subtwophasestate;
      73        1226 :     sub->disableonerr = subform->subdisableonerr;
      74        1226 :     sub->passwordrequired = subform->subpasswordrequired;
      75        1226 :     sub->runasowner = subform->subrunasowner;
      76             : 
      77             :     /* Get conninfo */
      78        1226 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
      79             :                                    tup,
      80             :                                    Anum_pg_subscription_subconninfo);
      81        1226 :     sub->conninfo = TextDatumGetCString(datum);
      82             : 
      83             :     /* Get slotname */
      84        1226 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID,
      85             :                             tup,
      86             :                             Anum_pg_subscription_subslotname,
      87             :                             &isnull);
      88        1226 :     if (!isnull)
      89        1160 :         sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
      90             :     else
      91          66 :         sub->slotname = NULL;
      92             : 
      93             :     /* Get synccommit */
      94        1226 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
      95             :                                    tup,
      96             :                                    Anum_pg_subscription_subsynccommit);
      97        1226 :     sub->synccommit = TextDatumGetCString(datum);
      98             : 
      99             :     /* Get publications */
     100        1226 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     101             :                                    tup,
     102             :                                    Anum_pg_subscription_subpublications);
     103        1226 :     sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
     104             : 
     105             :     /* Get origin */
     106        1226 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     107             :                                    tup,
     108             :                                    Anum_pg_subscription_suborigin);
     109        1226 :     sub->origin = TextDatumGetCString(datum);
     110             : 
     111             :     /* Is the subscription owner a superuser? */
     112        1226 :     sub->ownersuperuser = superuser_arg(sub->owner);
     113             : 
     114        1226 :     ReleaseSysCache(tup);
     115             : 
     116        1226 :     return sub;
     117             : }
     118             : 
     119             : /*
     120             :  * Return number of subscriptions defined in given database.
     121             :  * Used by dropdb() to check if database can indeed be dropped.
     122             :  */
     123             : int
     124          50 : CountDBSubscriptions(Oid dbid)
     125             : {
     126          50 :     int         nsubs = 0;
     127             :     Relation    rel;
     128             :     ScanKeyData scankey;
     129             :     SysScanDesc scan;
     130             :     HeapTuple   tup;
     131             : 
     132          50 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     133             : 
     134          50 :     ScanKeyInit(&scankey,
     135             :                 Anum_pg_subscription_subdbid,
     136             :                 BTEqualStrategyNumber, F_OIDEQ,
     137             :                 ObjectIdGetDatum(dbid));
     138             : 
     139          50 :     scan = systable_beginscan(rel, InvalidOid, false,
     140             :                               NULL, 1, &scankey);
     141             : 
     142          50 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     143           0 :         nsubs++;
     144             : 
     145          50 :     systable_endscan(scan);
     146             : 
     147          50 :     table_close(rel, NoLock);
     148             : 
     149          50 :     return nsubs;
     150             : }
     151             : 
     152             : /*
     153             :  * Free memory allocated by subscription struct.
     154             :  */
     155             : void
     156          60 : FreeSubscription(Subscription *sub)
     157             : {
     158          60 :     pfree(sub->name);
     159          60 :     pfree(sub->conninfo);
     160          60 :     if (sub->slotname)
     161          60 :         pfree(sub->slotname);
     162          60 :     list_free_deep(sub->publications);
     163          60 :     pfree(sub);
     164          60 : }
     165             : 
     166             : /*
     167             :  * Disable the given subscription.
     168             :  */
     169             : void
     170           8 : DisableSubscription(Oid subid)
     171             : {
     172             :     Relation    rel;
     173             :     bool        nulls[Natts_pg_subscription];
     174             :     bool        replaces[Natts_pg_subscription];
     175             :     Datum       values[Natts_pg_subscription];
     176             :     HeapTuple   tup;
     177             : 
     178             :     /* Look up the subscription in the catalog */
     179           8 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     180           8 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
     181             : 
     182           8 :     if (!HeapTupleIsValid(tup))
     183           0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
     184             : 
     185           8 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     186             : 
     187             :     /* Form a new tuple. */
     188           8 :     memset(values, 0, sizeof(values));
     189           8 :     memset(nulls, false, sizeof(nulls));
     190           8 :     memset(replaces, false, sizeof(replaces));
     191             : 
     192             :     /* Set the subscription to disabled. */
     193           8 :     values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
     194           8 :     replaces[Anum_pg_subscription_subenabled - 1] = true;
     195             : 
     196             :     /* Update the catalog */
     197           8 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     198             :                             replaces);
     199           8 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     200           8 :     heap_freetuple(tup);
     201             : 
     202           8 :     table_close(rel, NoLock);
     203           8 : }
     204             : 
     205             : /*
     206             :  * Convert text array to list of strings.
     207             :  *
     208             :  * Note: the resulting list of strings is pallocated here.
     209             :  */
     210             : static List *
     211        1226 : textarray_to_stringlist(ArrayType *textarray)
     212             : {
     213             :     Datum      *elems;
     214             :     int         nelems,
     215             :                 i;
     216        1226 :     List       *res = NIL;
     217             : 
     218        1226 :     deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
     219             : 
     220        1226 :     if (nelems == 0)
     221           0 :         return NIL;
     222             : 
     223        3126 :     for (i = 0; i < nelems; i++)
     224        1900 :         res = lappend(res, makeString(TextDatumGetCString(elems[i])));
     225             : 
     226        1226 :     return res;
     227             : }
     228             : 
     229             : /*
     230             :  * Add new state record for a subscription table.
     231             :  */
     232             : void
     233         332 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
     234             :                         XLogRecPtr sublsn)
     235             : {
     236             :     Relation    rel;
     237             :     HeapTuple   tup;
     238             :     bool        nulls[Natts_pg_subscription_rel];
     239             :     Datum       values[Natts_pg_subscription_rel];
     240             : 
     241         332 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     242             : 
     243         332 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     244             : 
     245             :     /* Try finding existing mapping. */
     246         332 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     247             :                               ObjectIdGetDatum(relid),
     248             :                               ObjectIdGetDatum(subid));
     249         332 :     if (HeapTupleIsValid(tup))
     250           0 :         elog(ERROR, "subscription table %u in subscription %u already exists",
     251             :              relid, subid);
     252             : 
     253             :     /* Form the tuple. */
     254         332 :     memset(values, 0, sizeof(values));
     255         332 :     memset(nulls, false, sizeof(nulls));
     256         332 :     values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
     257         332 :     values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
     258         332 :     values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     259         332 :     if (sublsn != InvalidXLogRecPtr)
     260           0 :         values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     261             :     else
     262         332 :         nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     263             : 
     264         332 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     265             : 
     266             :     /* Insert tuple into catalog. */
     267         332 :     CatalogTupleInsert(rel, tup);
     268             : 
     269         332 :     heap_freetuple(tup);
     270             : 
     271             :     /* Cleanup. */
     272         332 :     table_close(rel, NoLock);
     273         332 : }
     274             : 
     275             : /*
     276             :  * Update the state of a subscription table.
     277             :  */
     278             : void
     279        1260 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
     280             :                            XLogRecPtr sublsn)
     281             : {
     282             :     Relation    rel;
     283             :     HeapTuple   tup;
     284             :     bool        nulls[Natts_pg_subscription_rel];
     285             :     Datum       values[Natts_pg_subscription_rel];
     286             :     bool        replaces[Natts_pg_subscription_rel];
     287             : 
     288        1260 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     289             : 
     290        1260 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     291             : 
     292             :     /* Try finding existing mapping. */
     293        1260 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     294             :                               ObjectIdGetDatum(relid),
     295             :                               ObjectIdGetDatum(subid));
     296        1260 :     if (!HeapTupleIsValid(tup))
     297           0 :         elog(ERROR, "subscription table %u in subscription %u does not exist",
     298             :              relid, subid);
     299             : 
     300             :     /* Update the tuple. */
     301        1260 :     memset(values, 0, sizeof(values));
     302        1260 :     memset(nulls, false, sizeof(nulls));
     303        1260 :     memset(replaces, false, sizeof(replaces));
     304             : 
     305        1260 :     replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
     306        1260 :     values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     307             : 
     308        1260 :     replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
     309        1260 :     if (sublsn != InvalidXLogRecPtr)
     310         618 :         values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     311             :     else
     312         642 :         nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     313             : 
     314        1260 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     315             :                             replaces);
     316             : 
     317             :     /* Update the catalog. */
     318        1260 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     319             : 
     320             :     /* Cleanup. */
     321        1260 :     table_close(rel, NoLock);
     322        1260 : }
     323             : 
     324             : /*
     325             :  * Get state of subscription table.
     326             :  *
     327             :  * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
     328             :  */
     329             : char
     330        1986 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
     331             : {
     332             :     HeapTuple   tup;
     333             :     char        substate;
     334             :     bool        isnull;
     335             :     Datum       d;
     336             :     Relation    rel;
     337             : 
     338             :     /*
     339             :      * This is to avoid the race condition with AlterSubscription which tries
     340             :      * to remove this relstate.
     341             :      */
     342        1986 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     343             : 
     344             :     /* Try finding the mapping. */
     345        1986 :     tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
     346             :                           ObjectIdGetDatum(relid),
     347             :                           ObjectIdGetDatum(subid));
     348             : 
     349        1986 :     if (!HeapTupleIsValid(tup))
     350             :     {
     351          50 :         table_close(rel, AccessShareLock);
     352          50 :         *sublsn = InvalidXLogRecPtr;
     353          50 :         return SUBREL_STATE_UNKNOWN;
     354             :     }
     355             : 
     356             :     /* Get the state. */
     357        1936 :     substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
     358             : 
     359             :     /* Get the LSN */
     360        1936 :     d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     361             :                         Anum_pg_subscription_rel_srsublsn, &isnull);
     362        1936 :     if (isnull)
     363        1018 :         *sublsn = InvalidXLogRecPtr;
     364             :     else
     365         918 :         *sublsn = DatumGetLSN(d);
     366             : 
     367             :     /* Cleanup */
     368        1936 :     ReleaseSysCache(tup);
     369             : 
     370        1936 :     table_close(rel, AccessShareLock);
     371             : 
     372        1936 :     return substate;
     373             : }
     374             : 
     375             : /*
     376             :  * Drop subscription relation mapping. These can be for a particular
     377             :  * subscription, or for a particular relation, or both.
     378             :  */
     379             : void
     380       39070 : RemoveSubscriptionRel(Oid subid, Oid relid)
     381             : {
     382             :     Relation    rel;
     383             :     TableScanDesc scan;
     384             :     ScanKeyData skey[2];
     385             :     HeapTuple   tup;
     386       39070 :     int         nkeys = 0;
     387             : 
     388       39070 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     389             : 
     390       39070 :     if (OidIsValid(subid))
     391             :     {
     392         208 :         ScanKeyInit(&skey[nkeys++],
     393             :                     Anum_pg_subscription_rel_srsubid,
     394             :                     BTEqualStrategyNumber,
     395             :                     F_OIDEQ,
     396             :                     ObjectIdGetDatum(subid));
     397             :     }
     398             : 
     399       39070 :     if (OidIsValid(relid))
     400             :     {
     401       38924 :         ScanKeyInit(&skey[nkeys++],
     402             :                     Anum_pg_subscription_rel_srrelid,
     403             :                     BTEqualStrategyNumber,
     404             :                     F_OIDEQ,
     405             :                     ObjectIdGetDatum(relid));
     406             :     }
     407             : 
     408             :     /* Do the search and delete what we found. */
     409       39070 :     scan = table_beginscan_catalog(rel, nkeys, skey);
     410       39240 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     411             :     {
     412             :         Form_pg_subscription_rel subrel;
     413             : 
     414         170 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     415             : 
     416             :         /*
     417             :          * We don't allow to drop the relation mapping when the table
     418             :          * synchronization is in progress unless the caller updates the
     419             :          * corresponding subscription as well. This is to ensure that we don't
     420             :          * leave tablesync slots or origins in the system when the
     421             :          * corresponding table is dropped.
     422             :          */
     423         170 :         if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY)
     424             :         {
     425           0 :             ereport(ERROR,
     426             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     427             :                      errmsg("could not drop relation mapping for subscription \"%s\"",
     428             :                             get_subscription_name(subrel->srsubid, false)),
     429             :                      errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
     430             :                                get_rel_name(relid), subrel->srsubstate),
     431             : 
     432             :             /*
     433             :              * translator: first %s is a SQL ALTER command and second %s is a
     434             :              * SQL DROP command
     435             :              */
     436             :                      errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
     437             :                              "ALTER SUBSCRIPTION ... ENABLE",
     438             :                              "DROP SUBSCRIPTION ...")));
     439             :         }
     440             : 
     441         170 :         CatalogTupleDelete(rel, &tup->t_self);
     442             :     }
     443       39070 :     table_endscan(scan);
     444             : 
     445       39070 :     table_close(rel, RowExclusiveLock);
     446       39070 : }
     447             : 
     448             : /*
     449             :  * Does the subscription have any relations?
     450             :  *
     451             :  * Use this function only to know true/false, and when you have no need for the
     452             :  * List returned by GetSubscriptionRelations.
     453             :  */
     454             : bool
     455         368 : HasSubscriptionRelations(Oid subid)
     456             : {
     457             :     Relation    rel;
     458             :     ScanKeyData skey[1];
     459             :     SysScanDesc scan;
     460             :     bool        has_subrels;
     461             : 
     462         368 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     463             : 
     464         368 :     ScanKeyInit(&skey[0],
     465             :                 Anum_pg_subscription_rel_srsubid,
     466             :                 BTEqualStrategyNumber, F_OIDEQ,
     467             :                 ObjectIdGetDatum(subid));
     468             : 
     469         368 :     scan = systable_beginscan(rel, InvalidOid, false,
     470             :                               NULL, 1, skey);
     471             : 
     472             :     /* If even a single tuple exists then the subscription has tables. */
     473         368 :     has_subrels = HeapTupleIsValid(systable_getnext(scan));
     474             : 
     475             :     /* Cleanup */
     476         368 :     systable_endscan(scan);
     477         368 :     table_close(rel, AccessShareLock);
     478             : 
     479         368 :     return has_subrels;
     480             : }
     481             : 
     482             : /*
     483             :  * Get the relations for the subscription.
     484             :  *
     485             :  * If not_ready is true, return only the relations that are not in a ready
     486             :  * state, otherwise return all the relations of the subscription.  The
     487             :  * returned list is palloc'ed in the current memory context.
     488             :  */
     489             : List *
     490        1602 : GetSubscriptionRelations(Oid subid, bool not_ready)
     491             : {
     492        1602 :     List       *res = NIL;
     493             :     Relation    rel;
     494             :     HeapTuple   tup;
     495        1602 :     int         nkeys = 0;
     496             :     ScanKeyData skey[2];
     497             :     SysScanDesc scan;
     498             : 
     499        1602 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     500             : 
     501        1602 :     ScanKeyInit(&skey[nkeys++],
     502             :                 Anum_pg_subscription_rel_srsubid,
     503             :                 BTEqualStrategyNumber, F_OIDEQ,
     504             :                 ObjectIdGetDatum(subid));
     505             : 
     506        1602 :     if (not_ready)
     507        1526 :         ScanKeyInit(&skey[nkeys++],
     508             :                     Anum_pg_subscription_rel_srsubstate,
     509             :                     BTEqualStrategyNumber, F_CHARNE,
     510             :                     CharGetDatum(SUBREL_STATE_READY));
     511             : 
     512        1602 :     scan = systable_beginscan(rel, InvalidOid, false,
     513             :                               NULL, nkeys, skey);
     514             : 
     515        3914 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     516             :     {
     517             :         Form_pg_subscription_rel subrel;
     518             :         SubscriptionRelState *relstate;
     519             :         Datum       d;
     520             :         bool        isnull;
     521             : 
     522        2312 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     523             : 
     524        2312 :         relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
     525        2312 :         relstate->relid = subrel->srrelid;
     526        2312 :         relstate->state = subrel->srsubstate;
     527        2312 :         d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     528             :                             Anum_pg_subscription_rel_srsublsn, &isnull);
     529        2312 :         if (isnull)
     530        1816 :             relstate->lsn = InvalidXLogRecPtr;
     531             :         else
     532         496 :             relstate->lsn = DatumGetLSN(d);
     533             : 
     534        2312 :         res = lappend(res, relstate);
     535             :     }
     536             : 
     537             :     /* Cleanup */
     538        1602 :     systable_endscan(scan);
     539        1602 :     table_close(rel, AccessShareLock);
     540             : 
     541        1602 :     return res;
     542             : }

Generated by: LCOV version 1.14