LCOV - code coverage report
Current view: top level - src/backend/catalog - pg_subscription.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13devel Lines: 156 168 92.9 %
Date: 2019-11-22 08:06:54 Functions: 12 12 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_subscription.c
       4             :  *      replication subscriptions
       5             :  *
       6             :  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *      src/backend/catalog/pg_subscription.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/genam.h"
      18             : #include "access/heapam.h"
      19             : #include "access/htup_details.h"
      20             : #include "access/tableam.h"
      21             : #include "access/xact.h"
      22             : #include "catalog/indexing.h"
      23             : #include "catalog/pg_subscription.h"
      24             : #include "catalog/pg_subscription_rel.h"
      25             : #include "catalog/pg_type.h"
      26             : #include "miscadmin.h"
      27             : #include "nodes/makefuncs.h"
      28             : #include "storage/lmgr.h"
      29             : #include "utils/array.h"
      30             : #include "utils/builtins.h"
      31             : #include "utils/fmgroids.h"
      32             : #include "utils/pg_lsn.h"
      33             : #include "utils/rel.h"
      34             : #include "utils/syscache.h"
      35             : 
      36             : static List *textarray_to_stringlist(ArrayType *textarray);
      37             : 
      38             : /*
      39             :  * Fetch the subscription from the syscache.
      40             :  */
      41             : Subscription *
      42         200 : GetSubscription(Oid subid, bool missing_ok)
      43             : {
      44             :     HeapTuple   tup;
      45             :     Subscription *sub;
      46             :     Form_pg_subscription subform;
      47             :     Datum       datum;
      48             :     bool        isnull;
      49             : 
      50         200 :     tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
      51             : 
      52         200 :     if (!HeapTupleIsValid(tup))
      53             :     {
      54           0 :         if (missing_ok)
      55           0 :             return NULL;
      56             : 
      57           0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
      58             :     }
      59             : 
      60         200 :     subform = (Form_pg_subscription) GETSTRUCT(tup);
      61             : 
      62         200 :     sub = (Subscription *) palloc(sizeof(Subscription));
      63         200 :     sub->oid = subid;
      64         200 :     sub->dbid = subform->subdbid;
      65         200 :     sub->name = pstrdup(NameStr(subform->subname));
      66         200 :     sub->owner = subform->subowner;
      67         200 :     sub->enabled = subform->subenabled;
      68             : 
      69             :     /* Get conninfo */
      70         200 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID,
      71             :                             tup,
      72             :                             Anum_pg_subscription_subconninfo,
      73             :                             &isnull);
      74             :     Assert(!isnull);
      75         200 :     sub->conninfo = TextDatumGetCString(datum);
      76             : 
      77             :     /* Get slotname */
      78         200 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID,
      79             :                             tup,
      80             :                             Anum_pg_subscription_subslotname,
      81             :                             &isnull);
      82         200 :     if (!isnull)
      83         192 :         sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
      84             :     else
      85           8 :         sub->slotname = NULL;
      86             : 
      87             :     /* Get synccommit */
      88         200 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID,
      89             :                             tup,
      90             :                             Anum_pg_subscription_subsynccommit,
      91             :                             &isnull);
      92             :     Assert(!isnull);
      93         200 :     sub->synccommit = TextDatumGetCString(datum);
      94             : 
      95             :     /* Get publications */
      96         200 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID,
      97             :                             tup,
      98             :                             Anum_pg_subscription_subpublications,
      99             :                             &isnull);
     100             :     Assert(!isnull);
     101         200 :     sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
     102             : 
     103         200 :     ReleaseSysCache(tup);
     104             : 
     105         200 :     return sub;
     106             : }
     107             : 
     108             : /*
     109             :  * Return number of subscriptions defined in given database.
     110             :  * Used by dropdb() to check if database can indeed be dropped.
     111             :  */
     112             : int
     113          18 : CountDBSubscriptions(Oid dbid)
     114             : {
     115          18 :     int         nsubs = 0;
     116             :     Relation    rel;
     117             :     ScanKeyData scankey;
     118             :     SysScanDesc scan;
     119             :     HeapTuple   tup;
     120             : 
     121          18 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     122             : 
     123          18 :     ScanKeyInit(&scankey,
     124             :                 Anum_pg_subscription_subdbid,
     125             :                 BTEqualStrategyNumber, F_OIDEQ,
     126             :                 ObjectIdGetDatum(dbid));
     127             : 
     128          18 :     scan = systable_beginscan(rel, InvalidOid, false,
     129             :                               NULL, 1, &scankey);
     130             : 
     131          36 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     132           0 :         nsubs++;
     133             : 
     134          18 :     systable_endscan(scan);
     135             : 
     136          18 :     table_close(rel, NoLock);
     137             : 
     138          18 :     return nsubs;
     139             : }
     140             : 
     141             : /*
     142             :  * Free memory allocated by subscription struct.
     143             :  */
     144             : void
     145           8 : FreeSubscription(Subscription *sub)
     146             : {
     147           8 :     pfree(sub->name);
     148           8 :     pfree(sub->conninfo);
     149           8 :     if (sub->slotname)
     150           8 :         pfree(sub->slotname);
     151           8 :     list_free_deep(sub->publications);
     152           8 :     pfree(sub);
     153           8 : }
     154             : 
     155             : /*
     156             :  * get_subscription_oid - given a subscription name, look up the OID
     157             :  *
     158             :  * If missing_ok is false, throw an error if name not found.  If true, just
     159             :  * return InvalidOid.
     160             :  */
     161             : Oid
     162          34 : get_subscription_oid(const char *subname, bool missing_ok)
     163             : {
     164             :     Oid         oid;
     165             : 
     166          34 :     oid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
     167             :                           MyDatabaseId, CStringGetDatum(subname));
     168          34 :     if (!OidIsValid(oid) && !missing_ok)
     169           4 :         ereport(ERROR,
     170             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     171             :                  errmsg("subscription \"%s\" does not exist", subname)));
     172          30 :     return oid;
     173             : }
     174             : 
     175             : /*
     176             :  * get_subscription_name - given a subscription OID, look up the name
     177             :  *
     178             :  * If missing_ok is false, throw an error if name not found.  If true, just
     179             :  * return NULL.
     180             :  */
     181             : char *
     182          28 : get_subscription_name(Oid subid, bool missing_ok)
     183             : {
     184             :     HeapTuple   tup;
     185             :     char       *subname;
     186             :     Form_pg_subscription subform;
     187             : 
     188          28 :     tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
     189             : 
     190          28 :     if (!HeapTupleIsValid(tup))
     191             :     {
     192           0 :         if (!missing_ok)
     193           0 :             elog(ERROR, "cache lookup failed for subscription %u", subid);
     194           0 :         return NULL;
     195             :     }
     196             : 
     197          28 :     subform = (Form_pg_subscription) GETSTRUCT(tup);
     198          28 :     subname = pstrdup(NameStr(subform->subname));
     199             : 
     200          28 :     ReleaseSysCache(tup);
     201             : 
     202          28 :     return subname;
     203             : }
     204             : 
     205             : /*
     206             :  * Convert text array to list of strings.
     207             :  *
     208             :  * Note: the resulting list of strings is pallocated here.
     209             :  */
     210             : static List *
     211         200 : textarray_to_stringlist(ArrayType *textarray)
     212             : {
     213             :     Datum      *elems;
     214             :     int         nelems,
     215             :                 i;
     216         200 :     List       *res = NIL;
     217             : 
     218         200 :     deconstruct_array(textarray,
     219             :                       TEXTOID, -1, false, 'i',
     220             :                       &elems, NULL, &nelems);
     221             : 
     222         200 :     if (nelems == 0)
     223           0 :         return NIL;
     224             : 
     225         454 :     for (i = 0; i < nelems; i++)
     226         254 :         res = lappend(res, makeString(TextDatumGetCString(elems[i])));
     227             : 
     228         200 :     return res;
     229             : }
     230             : 
     231             : /*
     232             :  * Add new state record for a subscription table.
     233             :  */
     234             : void
     235          80 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
     236             :                         XLogRecPtr sublsn)
     237             : {
     238             :     Relation    rel;
     239             :     HeapTuple   tup;
     240             :     bool        nulls[Natts_pg_subscription_rel];
     241             :     Datum       values[Natts_pg_subscription_rel];
     242             : 
     243          80 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     244             : 
     245          80 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     246             : 
     247             :     /* Try finding existing mapping. */
     248          80 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     249             :                               ObjectIdGetDatum(relid),
     250             :                               ObjectIdGetDatum(subid));
     251          80 :     if (HeapTupleIsValid(tup))
     252           0 :         elog(ERROR, "subscription table %u in subscription %u already exists",
     253             :              relid, subid);
     254             : 
     255             :     /* Form the tuple. */
     256          80 :     memset(values, 0, sizeof(values));
     257          80 :     memset(nulls, false, sizeof(nulls));
     258          80 :     values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
     259          80 :     values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
     260          80 :     values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     261          80 :     if (sublsn != InvalidXLogRecPtr)
     262           0 :         values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     263             :     else
     264          80 :         nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     265             : 
     266          80 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     267             : 
     268             :     /* Insert tuple into catalog. */
     269          80 :     CatalogTupleInsert(rel, tup);
     270             : 
     271          80 :     heap_freetuple(tup);
     272             : 
     273             :     /* Cleanup. */
     274          80 :     table_close(rel, NoLock);
     275          80 : }
     276             : 
     277             : /*
     278             :  * Update the state of a subscription table.
     279             :  */
     280             : void
     281         218 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
     282             :                            XLogRecPtr sublsn)
     283             : {
     284             :     Relation    rel;
     285             :     HeapTuple   tup;
     286             :     bool        nulls[Natts_pg_subscription_rel];
     287             :     Datum       values[Natts_pg_subscription_rel];
     288             :     bool        replaces[Natts_pg_subscription_rel];
     289             : 
     290         218 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     291             : 
     292         218 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     293             : 
     294             :     /* Try finding existing mapping. */
     295         218 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     296             :                               ObjectIdGetDatum(relid),
     297             :                               ObjectIdGetDatum(subid));
     298         218 :     if (!HeapTupleIsValid(tup))
     299           0 :         elog(ERROR, "subscription table %u in subscription %u does not exist",
     300             :              relid, subid);
     301             : 
     302             :     /* Update the tuple. */
     303         218 :     memset(values, 0, sizeof(values));
     304         218 :     memset(nulls, false, sizeof(nulls));
     305         218 :     memset(replaces, false, sizeof(replaces));
     306             : 
     307         218 :     replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
     308         218 :     values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     309             : 
     310         218 :     replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
     311         218 :     if (sublsn != InvalidXLogRecPtr)
     312         144 :         values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     313             :     else
     314          74 :         nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     315             : 
     316         218 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     317             :                             replaces);
     318             : 
     319             :     /* Update the catalog. */
     320         218 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     321             : 
     322             :     /* Cleanup. */
     323         218 :     table_close(rel, NoLock);
     324         218 : }
     325             : 
     326             : /*
     327             :  * Get state of subscription table.
     328             :  *
     329             :  * Returns SUBREL_STATE_UNKNOWN when not found and missing_ok is true.
     330             :  */
     331             : char
     332         396 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn,
     333             :                         bool missing_ok)
     334             : {
     335             :     Relation    rel;
     336             :     HeapTuple   tup;
     337             :     char        substate;
     338             :     bool        isnull;
     339             :     Datum       d;
     340             : 
     341         396 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     342             : 
     343             :     /* Try finding the mapping. */
     344         396 :     tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
     345             :                           ObjectIdGetDatum(relid),
     346             :                           ObjectIdGetDatum(subid));
     347             : 
     348         396 :     if (!HeapTupleIsValid(tup))
     349             :     {
     350          24 :         if (missing_ok)
     351             :         {
     352          24 :             table_close(rel, AccessShareLock);
     353          24 :             *sublsn = InvalidXLogRecPtr;
     354          24 :             return SUBREL_STATE_UNKNOWN;
     355             :         }
     356             : 
     357           0 :         elog(ERROR, "subscription table %u in subscription %u does not exist",
     358             :              relid, subid);
     359             :     }
     360             : 
     361             :     /* Get the state. */
     362         372 :     d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     363             :                         Anum_pg_subscription_rel_srsubstate, &isnull);
     364             :     Assert(!isnull);
     365         372 :     substate = DatumGetChar(d);
     366         372 :     d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     367             :                         Anum_pg_subscription_rel_srsublsn, &isnull);
     368         372 :     if (isnull)
     369         228 :         *sublsn = InvalidXLogRecPtr;
     370             :     else
     371         144 :         *sublsn = DatumGetLSN(d);
     372             : 
     373             :     /* Cleanup */
     374         372 :     ReleaseSysCache(tup);
     375         372 :     table_close(rel, AccessShareLock);
     376             : 
     377         372 :     return substate;
     378             : }
     379             : 
     380             : /*
     381             :  * Drop subscription relation mapping. These can be for a particular
     382             :  * subscription, or for a particular relation, or both.
     383             :  */
     384             : void
     385       23160 : RemoveSubscriptionRel(Oid subid, Oid relid)
     386             : {
     387             :     Relation    rel;
     388             :     TableScanDesc scan;
     389             :     ScanKeyData skey[2];
     390             :     HeapTuple   tup;
     391       23160 :     int         nkeys = 0;
     392             : 
     393       23160 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     394             : 
     395       23160 :     if (OidIsValid(subid))
     396             :     {
     397          38 :         ScanKeyInit(&skey[nkeys++],
     398             :                     Anum_pg_subscription_rel_srsubid,
     399             :                     BTEqualStrategyNumber,
     400             :                     F_OIDEQ,
     401             :                     ObjectIdGetDatum(subid));
     402             :     }
     403             : 
     404       23160 :     if (OidIsValid(relid))
     405             :     {
     406       23132 :         ScanKeyInit(&skey[nkeys++],
     407             :                     Anum_pg_subscription_rel_srrelid,
     408             :                     BTEqualStrategyNumber,
     409             :                     F_OIDEQ,
     410             :                     ObjectIdGetDatum(relid));
     411             :     }
     412             : 
     413             :     /* Do the search and delete what we found. */
     414       23160 :     scan = table_beginscan_catalog(rel, nkeys, skey);
     415       46348 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     416             :     {
     417          28 :         CatalogTupleDelete(rel, &tup->t_self);
     418             :     }
     419       23160 :     table_endscan(scan);
     420             : 
     421       23160 :     table_close(rel, RowExclusiveLock);
     422       23160 : }
     423             : 
     424             : 
     425             : /*
     426             :  * Get all relations for subscription.
     427             :  *
     428             :  * Returned list is palloc'ed in current memory context.
     429             :  */
     430             : List *
     431           8 : GetSubscriptionRelations(Oid subid)
     432             : {
     433           8 :     List       *res = NIL;
     434             :     Relation    rel;
     435             :     HeapTuple   tup;
     436           8 :     int         nkeys = 0;
     437             :     ScanKeyData skey[2];
     438             :     SysScanDesc scan;
     439             : 
     440           8 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     441             : 
     442           8 :     ScanKeyInit(&skey[nkeys++],
     443             :                 Anum_pg_subscription_rel_srsubid,
     444             :                 BTEqualStrategyNumber, F_OIDEQ,
     445             :                 ObjectIdGetDatum(subid));
     446             : 
     447           8 :     scan = systable_beginscan(rel, InvalidOid, false,
     448             :                               NULL, nkeys, skey);
     449             : 
     450          34 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     451             :     {
     452             :         Form_pg_subscription_rel subrel;
     453             :         SubscriptionRelState *relstate;
     454             : 
     455          18 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     456             : 
     457          18 :         relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
     458          18 :         relstate->relid = subrel->srrelid;
     459          18 :         relstate->state = subrel->srsubstate;
     460          18 :         relstate->lsn = subrel->srsublsn;
     461             : 
     462          18 :         res = lappend(res, relstate);
     463             :     }
     464             : 
     465             :     /* Cleanup */
     466           8 :     systable_endscan(scan);
     467           8 :     table_close(rel, AccessShareLock);
     468             : 
     469           8 :     return res;
     470             : }
     471             : 
     472             : /*
     473             :  * Get all relations for subscription that are not in a ready state.
     474             :  *
     475             :  * Returned list is palloc'ed in current memory context.
     476             :  */
     477             : List *
     478         238 : GetSubscriptionNotReadyRelations(Oid subid)
     479             : {
     480         238 :     List       *res = NIL;
     481             :     Relation    rel;
     482             :     HeapTuple   tup;
     483         238 :     int         nkeys = 0;
     484             :     ScanKeyData skey[2];
     485             :     SysScanDesc scan;
     486             : 
     487         238 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     488             : 
     489         238 :     ScanKeyInit(&skey[nkeys++],
     490             :                 Anum_pg_subscription_rel_srsubid,
     491             :                 BTEqualStrategyNumber, F_OIDEQ,
     492             :                 ObjectIdGetDatum(subid));
     493             : 
     494         238 :     ScanKeyInit(&skey[nkeys++],
     495             :                 Anum_pg_subscription_rel_srsubstate,
     496             :                 BTEqualStrategyNumber, F_CHARNE,
     497             :                 CharGetDatum(SUBREL_STATE_READY));
     498             : 
     499         238 :     scan = systable_beginscan(rel, InvalidOid, false,
     500             :                               NULL, nkeys, skey);
     501             : 
     502        1064 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     503             :     {
     504             :         Form_pg_subscription_rel subrel;
     505             :         SubscriptionRelState *relstate;
     506             : 
     507         588 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     508             : 
     509         588 :         relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
     510         588 :         relstate->relid = subrel->srrelid;
     511         588 :         relstate->state = subrel->srsubstate;
     512         588 :         relstate->lsn = subrel->srsublsn;
     513             : 
     514         588 :         res = lappend(res, relstate);
     515             :     }
     516             : 
     517             :     /* Cleanup */
     518         238 :     systable_endscan(scan);
     519         238 :     table_close(rel, AccessShareLock);
     520             : 
     521         238 :     return res;
     522             : }

Generated by: LCOV version 1.13