LCOV - code coverage report
Current view: top level - src/backend/catalog - pg_subscription.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 170 179 95.0 %
Date: 2024-04-20 11:11:13 Functions: 11 11 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_subscription.c
       4             :  *      replication subscriptions
       5             :  *
       6             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *      src/backend/catalog/pg_subscription.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/genam.h"
      18             : #include "access/heapam.h"
      19             : #include "access/htup_details.h"
      20             : #include "access/tableam.h"
      21             : #include "catalog/indexing.h"
      22             : #include "catalog/pg_subscription.h"
      23             : #include "catalog/pg_subscription_rel.h"
      24             : #include "catalog/pg_type.h"
      25             : #include "miscadmin.h"
      26             : #include "storage/lmgr.h"
      27             : #include "utils/array.h"
      28             : #include "utils/builtins.h"
      29             : #include "utils/fmgroids.h"
      30             : #include "utils/lsyscache.h"
      31             : #include "utils/pg_lsn.h"
      32             : #include "utils/rel.h"
      33             : #include "utils/syscache.h"
      34             : 
      35             : static List *textarray_to_stringlist(ArrayType *textarray);
      36             : 
      37             : /*
      38             :  * Fetch the subscription from the syscache.
      39             :  */
      40             : Subscription *
      41        1288 : GetSubscription(Oid subid, bool missing_ok)
      42             : {
      43             :     HeapTuple   tup;
      44             :     Subscription *sub;
      45             :     Form_pg_subscription subform;
      46             :     Datum       datum;
      47             :     bool        isnull;
      48             : 
      49        1288 :     tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
      50             : 
      51        1284 :     if (!HeapTupleIsValid(tup))
      52             :     {
      53           0 :         if (missing_ok)
      54           0 :             return NULL;
      55             : 
      56           0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
      57             :     }
      58             : 
      59        1284 :     subform = (Form_pg_subscription) GETSTRUCT(tup);
      60             : 
      61        1284 :     sub = (Subscription *) palloc(sizeof(Subscription));
      62        1284 :     sub->oid = subid;
      63        1284 :     sub->dbid = subform->subdbid;
      64        1284 :     sub->skiplsn = subform->subskiplsn;
      65        1284 :     sub->name = pstrdup(NameStr(subform->subname));
      66        1284 :     sub->owner = subform->subowner;
      67        1284 :     sub->enabled = subform->subenabled;
      68        1284 :     sub->binary = subform->subbinary;
      69        1284 :     sub->stream = subform->substream;
      70        1284 :     sub->twophasestate = subform->subtwophasestate;
      71        1284 :     sub->disableonerr = subform->subdisableonerr;
      72        1284 :     sub->passwordrequired = subform->subpasswordrequired;
      73        1284 :     sub->runasowner = subform->subrunasowner;
      74        1284 :     sub->failover = subform->subfailover;
      75             : 
      76             :     /* Get conninfo */
      77        1284 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
      78             :                                    tup,
      79             :                                    Anum_pg_subscription_subconninfo);
      80        1284 :     sub->conninfo = TextDatumGetCString(datum);
      81             : 
      82             :     /* Get slotname */
      83        1284 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID,
      84             :                             tup,
      85             :                             Anum_pg_subscription_subslotname,
      86             :                             &isnull);
      87        1284 :     if (!isnull)
      88        1218 :         sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
      89             :     else
      90          66 :         sub->slotname = NULL;
      91             : 
      92             :     /* Get synccommit */
      93        1284 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
      94             :                                    tup,
      95             :                                    Anum_pg_subscription_subsynccommit);
      96        1284 :     sub->synccommit = TextDatumGetCString(datum);
      97             : 
      98             :     /* Get publications */
      99        1284 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     100             :                                    tup,
     101             :                                    Anum_pg_subscription_subpublications);
     102        1284 :     sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
     103             : 
     104             :     /* Get origin */
     105        1284 :     datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
     106             :                                    tup,
     107             :                                    Anum_pg_subscription_suborigin);
     108        1284 :     sub->origin = TextDatumGetCString(datum);
     109             : 
     110             :     /* Is the subscription owner a superuser? */
     111        1284 :     sub->ownersuperuser = superuser_arg(sub->owner);
     112             : 
     113        1284 :     ReleaseSysCache(tup);
     114             : 
     115        1284 :     return sub;
     116             : }
     117             : 
     118             : /*
     119             :  * Return number of subscriptions defined in given database.
     120             :  * Used by dropdb() to check if database can indeed be dropped.
     121             :  */
     122             : int
     123          62 : CountDBSubscriptions(Oid dbid)
     124             : {
     125          62 :     int         nsubs = 0;
     126             :     Relation    rel;
     127             :     ScanKeyData scankey;
     128             :     SysScanDesc scan;
     129             :     HeapTuple   tup;
     130             : 
     131          62 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     132             : 
     133          62 :     ScanKeyInit(&scankey,
     134             :                 Anum_pg_subscription_subdbid,
     135             :                 BTEqualStrategyNumber, F_OIDEQ,
     136             :                 ObjectIdGetDatum(dbid));
     137             : 
     138          62 :     scan = systable_beginscan(rel, InvalidOid, false,
     139             :                               NULL, 1, &scankey);
     140             : 
     141          62 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     142           0 :         nsubs++;
     143             : 
     144          62 :     systable_endscan(scan);
     145             : 
     146          62 :     table_close(rel, NoLock);
     147             : 
     148          62 :     return nsubs;
     149             : }
     150             : 
     151             : /*
     152             :  * Free memory allocated by subscription struct.
     153             :  */
     154             : void
     155          64 : FreeSubscription(Subscription *sub)
     156             : {
     157          64 :     pfree(sub->name);
     158          64 :     pfree(sub->conninfo);
     159          64 :     if (sub->slotname)
     160          64 :         pfree(sub->slotname);
     161          64 :     list_free_deep(sub->publications);
     162          64 :     pfree(sub);
     163          64 : }
     164             : 
     165             : /*
     166             :  * Disable the given subscription.
     167             :  */
     168             : void
     169           8 : DisableSubscription(Oid subid)
     170             : {
     171             :     Relation    rel;
     172             :     bool        nulls[Natts_pg_subscription];
     173             :     bool        replaces[Natts_pg_subscription];
     174             :     Datum       values[Natts_pg_subscription];
     175             :     HeapTuple   tup;
     176             : 
     177             :     /* Look up the subscription in the catalog */
     178           8 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     179           8 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
     180             : 
     181           8 :     if (!HeapTupleIsValid(tup))
     182           0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
     183             : 
     184           8 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     185             : 
     186             :     /* Form a new tuple. */
     187           8 :     memset(values, 0, sizeof(values));
     188           8 :     memset(nulls, false, sizeof(nulls));
     189           8 :     memset(replaces, false, sizeof(replaces));
     190             : 
     191             :     /* Set the subscription to disabled. */
     192           8 :     values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
     193           8 :     replaces[Anum_pg_subscription_subenabled - 1] = true;
     194             : 
     195             :     /* Update the catalog */
     196           8 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     197             :                             replaces);
     198           8 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     199           8 :     heap_freetuple(tup);
     200             : 
     201           8 :     table_close(rel, NoLock);
     202           8 : }
     203             : 
     204             : /*
     205             :  * Convert text array to list of strings.
     206             :  *
     207             :  * Note: the resulting list of strings is pallocated here.
     208             :  */
     209             : static List *
     210        1284 : textarray_to_stringlist(ArrayType *textarray)
     211             : {
     212             :     Datum      *elems;
     213             :     int         nelems,
     214             :                 i;
     215        1284 :     List       *res = NIL;
     216             : 
     217        1284 :     deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
     218             : 
     219        1284 :     if (nelems == 0)
     220           0 :         return NIL;
     221             : 
     222        3228 :     for (i = 0; i < nelems; i++)
     223        1944 :         res = lappend(res, makeString(TextDatumGetCString(elems[i])));
     224             : 
     225        1284 :     return res;
     226             : }
     227             : 
     228             : /*
     229             :  * Add new state record for a subscription table.
     230             :  *
     231             :  * If retain_lock is true, then don't release the locks taken in this function.
     232             :  * We normally release the locks at the end of transaction but in binary-upgrade
     233             :  * mode, we expect to release those immediately.
     234             :  */
     235             : void
     236         350 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
     237             :                         XLogRecPtr sublsn, bool retain_lock)
     238             : {
     239             :     Relation    rel;
     240             :     HeapTuple   tup;
     241             :     bool        nulls[Natts_pg_subscription_rel];
     242             :     Datum       values[Natts_pg_subscription_rel];
     243             : 
     244         350 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     245             : 
     246         350 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     247             : 
     248             :     /* Try finding existing mapping. */
     249         350 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     250             :                               ObjectIdGetDatum(relid),
     251             :                               ObjectIdGetDatum(subid));
     252         350 :     if (HeapTupleIsValid(tup))
     253           0 :         elog(ERROR, "subscription table %u in subscription %u already exists",
     254             :              relid, subid);
     255             : 
     256             :     /* Form the tuple. */
     257         350 :     memset(values, 0, sizeof(values));
     258         350 :     memset(nulls, false, sizeof(nulls));
     259         350 :     values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
     260         350 :     values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
     261         350 :     values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     262         350 :     if (sublsn != InvalidXLogRecPtr)
     263           2 :         values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     264             :     else
     265         348 :         nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     266             : 
     267         350 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     268             : 
     269             :     /* Insert tuple into catalog. */
     270         350 :     CatalogTupleInsert(rel, tup);
     271             : 
     272         350 :     heap_freetuple(tup);
     273             : 
     274             :     /* Cleanup. */
     275         350 :     if (retain_lock)
     276             :     {
     277         346 :         table_close(rel, NoLock);
     278             :     }
     279             :     else
     280             :     {
     281           4 :         table_close(rel, RowExclusiveLock);
     282           4 :         UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     283             :     }
     284         350 : }
     285             : 
     286             : /*
     287             :  * Update the state of a subscription table.
     288             :  */
     289             : void
     290        1294 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
     291             :                            XLogRecPtr sublsn)
     292             : {
     293             :     Relation    rel;
     294             :     HeapTuple   tup;
     295             :     bool        nulls[Natts_pg_subscription_rel];
     296             :     Datum       values[Natts_pg_subscription_rel];
     297             :     bool        replaces[Natts_pg_subscription_rel];
     298             : 
     299        1294 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     300             : 
     301        1294 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     302             : 
     303             :     /* Try finding existing mapping. */
     304        1294 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     305             :                               ObjectIdGetDatum(relid),
     306             :                               ObjectIdGetDatum(subid));
     307        1294 :     if (!HeapTupleIsValid(tup))
     308           0 :         elog(ERROR, "subscription table %u in subscription %u does not exist",
     309             :              relid, subid);
     310             : 
     311             :     /* Update the tuple. */
     312        1294 :     memset(values, 0, sizeof(values));
     313        1294 :     memset(nulls, false, sizeof(nulls));
     314        1294 :     memset(replaces, false, sizeof(replaces));
     315             : 
     316        1294 :     replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
     317        1294 :     values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     318             : 
     319        1294 :     replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
     320        1294 :     if (sublsn != InvalidXLogRecPtr)
     321         636 :         values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     322             :     else
     323         658 :         nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     324             : 
     325        1294 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     326             :                             replaces);
     327             : 
     328             :     /* Update the catalog. */
     329        1294 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     330             : 
     331             :     /* Cleanup. */
     332        1294 :     table_close(rel, NoLock);
     333        1294 : }
     334             : 
     335             : /*
     336             :  * Get state of subscription table.
     337             :  *
     338             :  * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
     339             :  */
     340             : char
     341        2028 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
     342             : {
     343             :     HeapTuple   tup;
     344             :     char        substate;
     345             :     bool        isnull;
     346             :     Datum       d;
     347             :     Relation    rel;
     348             : 
     349             :     /*
     350             :      * This is to avoid the race condition with AlterSubscription which tries
     351             :      * to remove this relstate.
     352             :      */
     353        2028 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     354             : 
     355             :     /* Try finding the mapping. */
     356        2028 :     tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
     357             :                           ObjectIdGetDatum(relid),
     358             :                           ObjectIdGetDatum(subid));
     359             : 
     360        2028 :     if (!HeapTupleIsValid(tup))
     361             :     {
     362          52 :         table_close(rel, AccessShareLock);
     363          52 :         *sublsn = InvalidXLogRecPtr;
     364          52 :         return SUBREL_STATE_UNKNOWN;
     365             :     }
     366             : 
     367             :     /* Get the state. */
     368        1976 :     substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
     369             : 
     370             :     /* Get the LSN */
     371        1976 :     d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     372             :                         Anum_pg_subscription_rel_srsublsn, &isnull);
     373        1976 :     if (isnull)
     374        1062 :         *sublsn = InvalidXLogRecPtr;
     375             :     else
     376         914 :         *sublsn = DatumGetLSN(d);
     377             : 
     378             :     /* Cleanup */
     379        1976 :     ReleaseSysCache(tup);
     380             : 
     381        1976 :     table_close(rel, AccessShareLock);
     382             : 
     383        1976 :     return substate;
     384             : }
     385             : 
     386             : /*
     387             :  * Drop subscription relation mapping. These can be for a particular
     388             :  * subscription, or for a particular relation, or both.
     389             :  */
     390             : void
     391       42534 : RemoveSubscriptionRel(Oid subid, Oid relid)
     392             : {
     393             :     Relation    rel;
     394             :     TableScanDesc scan;
     395             :     ScanKeyData skey[2];
     396             :     HeapTuple   tup;
     397       42534 :     int         nkeys = 0;
     398             : 
     399       42534 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     400             : 
     401       42534 :     if (OidIsValid(subid))
     402             :     {
     403         210 :         ScanKeyInit(&skey[nkeys++],
     404             :                     Anum_pg_subscription_rel_srsubid,
     405             :                     BTEqualStrategyNumber,
     406             :                     F_OIDEQ,
     407             :                     ObjectIdGetDatum(subid));
     408             :     }
     409             : 
     410       42534 :     if (OidIsValid(relid))
     411             :     {
     412       42360 :         ScanKeyInit(&skey[nkeys++],
     413             :                     Anum_pg_subscription_rel_srrelid,
     414             :                     BTEqualStrategyNumber,
     415             :                     F_OIDEQ,
     416             :                     ObjectIdGetDatum(relid));
     417             :     }
     418             : 
     419             :     /* Do the search and delete what we found. */
     420       42534 :     scan = table_beginscan_catalog(rel, nkeys, skey);
     421       42706 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     422             :     {
     423             :         Form_pg_subscription_rel subrel;
     424             : 
     425         172 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     426             : 
     427             :         /*
     428             :          * We don't allow to drop the relation mapping when the table
     429             :          * synchronization is in progress unless the caller updates the
     430             :          * corresponding subscription as well. This is to ensure that we don't
     431             :          * leave tablesync slots or origins in the system when the
     432             :          * corresponding table is dropped.
     433             :          */
     434         172 :         if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY)
     435             :         {
     436           0 :             ereport(ERROR,
     437             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     438             :                      errmsg("could not drop relation mapping for subscription \"%s\"",
     439             :                             get_subscription_name(subrel->srsubid, false)),
     440             :                      errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
     441             :                                get_rel_name(relid), subrel->srsubstate),
     442             : 
     443             :             /*
     444             :              * translator: first %s is a SQL ALTER command and second %s is a
     445             :              * SQL DROP command
     446             :              */
     447             :                      errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
     448             :                              "ALTER SUBSCRIPTION ... ENABLE",
     449             :                              "DROP SUBSCRIPTION ...")));
     450             :         }
     451             : 
     452         172 :         CatalogTupleDelete(rel, &tup->t_self);
     453             :     }
     454       42534 :     table_endscan(scan);
     455             : 
     456       42534 :     table_close(rel, RowExclusiveLock);
     457       42534 : }
     458             : 
     459             : /*
     460             :  * Does the subscription have any relations?
     461             :  *
     462             :  * Use this function only to know true/false, and when you have no need for the
     463             :  * List returned by GetSubscriptionRelations.
     464             :  */
     465             : bool
     466         386 : HasSubscriptionRelations(Oid subid)
     467             : {
     468             :     Relation    rel;
     469             :     ScanKeyData skey[1];
     470             :     SysScanDesc scan;
     471             :     bool        has_subrels;
     472             : 
     473         386 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     474             : 
     475         386 :     ScanKeyInit(&skey[0],
     476             :                 Anum_pg_subscription_rel_srsubid,
     477             :                 BTEqualStrategyNumber, F_OIDEQ,
     478             :                 ObjectIdGetDatum(subid));
     479             : 
     480         386 :     scan = systable_beginscan(rel, InvalidOid, false,
     481             :                               NULL, 1, skey);
     482             : 
     483             :     /* If even a single tuple exists then the subscription has tables. */
     484         386 :     has_subrels = HeapTupleIsValid(systable_getnext(scan));
     485             : 
     486             :     /* Cleanup */
     487         386 :     systable_endscan(scan);
     488         386 :     table_close(rel, AccessShareLock);
     489             : 
     490         386 :     return has_subrels;
     491             : }
     492             : 
     493             : /*
     494             :  * Get the relations for the subscription.
     495             :  *
     496             :  * If not_ready is true, return only the relations that are not in a ready
     497             :  * state, otherwise return all the relations of the subscription.  The
     498             :  * returned list is palloc'ed in the current memory context.
     499             :  */
     500             : List *
     501        1656 : GetSubscriptionRelations(Oid subid, bool not_ready)
     502             : {
     503        1656 :     List       *res = NIL;
     504             :     Relation    rel;
     505             :     HeapTuple   tup;
     506        1656 :     int         nkeys = 0;
     507             :     ScanKeyData skey[2];
     508             :     SysScanDesc scan;
     509             : 
     510        1656 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     511             : 
     512        1656 :     ScanKeyInit(&skey[nkeys++],
     513             :                 Anum_pg_subscription_rel_srsubid,
     514             :                 BTEqualStrategyNumber, F_OIDEQ,
     515             :                 ObjectIdGetDatum(subid));
     516             : 
     517        1656 :     if (not_ready)
     518        1600 :         ScanKeyInit(&skey[nkeys++],
     519             :                     Anum_pg_subscription_rel_srsubstate,
     520             :                     BTEqualStrategyNumber, F_CHARNE,
     521             :                     CharGetDatum(SUBREL_STATE_READY));
     522             : 
     523        1656 :     scan = systable_beginscan(rel, InvalidOid, false,
     524             :                               NULL, nkeys, skey);
     525             : 
     526        3956 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     527             :     {
     528             :         Form_pg_subscription_rel subrel;
     529             :         SubscriptionRelState *relstate;
     530             :         Datum       d;
     531             :         bool        isnull;
     532             : 
     533        2300 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     534             : 
     535        2300 :         relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
     536        2300 :         relstate->relid = subrel->srrelid;
     537        2300 :         relstate->state = subrel->srsubstate;
     538        2300 :         d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     539             :                             Anum_pg_subscription_rel_srsublsn, &isnull);
     540        2300 :         if (isnull)
     541        1822 :             relstate->lsn = InvalidXLogRecPtr;
     542             :         else
     543         478 :             relstate->lsn = DatumGetLSN(d);
     544             : 
     545        2300 :         res = lappend(res, relstate);
     546             :     }
     547             : 
     548             :     /* Cleanup */
     549        1656 :     systable_endscan(scan);
     550        1656 :     table_close(rel, AccessShareLock);
     551             : 
     552        1656 :     return res;
     553             : }

Generated by: LCOV version 1.14