LCOV - code coverage report
Current view: top level - src/backend/catalog - pg_subscription.c (source / functions) Hit Total Coverage
Test: PostgreSQL 12beta2 Lines: 156 168 92.9 %
Date: 2019-06-19 14:06:47 Functions: 12 12 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_subscription.c
       4             :  *      replication subscriptions
       5             :  *
       6             :  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *      src/backend/catalog/pg_subscription.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "miscadmin.h"
      18             : 
      19             : #include "access/genam.h"
      20             : #include "access/heapam.h"
      21             : #include "access/htup_details.h"
      22             : #include "access/tableam.h"
      23             : #include "access/xact.h"
      24             : 
      25             : #include "catalog/indexing.h"
      26             : #include "catalog/pg_type.h"
      27             : #include "catalog/pg_subscription.h"
      28             : #include "catalog/pg_subscription_rel.h"
      29             : 
      30             : #include "nodes/makefuncs.h"
      31             : 
      32             : #include "storage/lmgr.h"
      33             : 
      34             : #include "utils/array.h"
      35             : #include "utils/builtins.h"
      36             : #include "utils/fmgroids.h"
      37             : #include "utils/pg_lsn.h"
      38             : #include "utils/rel.h"
      39             : #include "utils/syscache.h"
      40             : 
      41             : 
      42             : static List *textarray_to_stringlist(ArrayType *textarray);
      43             : 
      44             : /*
      45             :  * Fetch the subscription from the syscache.
      46             :  */
      47             : Subscription *
      48         196 : GetSubscription(Oid subid, bool missing_ok)
      49             : {
      50             :     HeapTuple   tup;
      51             :     Subscription *sub;
      52             :     Form_pg_subscription subform;
      53             :     Datum       datum;
      54             :     bool        isnull;
      55             : 
      56         196 :     tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
      57             : 
      58         196 :     if (!HeapTupleIsValid(tup))
      59             :     {
      60           0 :         if (missing_ok)
      61           0 :             return NULL;
      62             : 
      63           0 :         elog(ERROR, "cache lookup failed for subscription %u", subid);
      64             :     }
      65             : 
      66         196 :     subform = (Form_pg_subscription) GETSTRUCT(tup);
      67             : 
      68         196 :     sub = (Subscription *) palloc(sizeof(Subscription));
      69         196 :     sub->oid = subid;
      70         196 :     sub->dbid = subform->subdbid;
      71         196 :     sub->name = pstrdup(NameStr(subform->subname));
      72         196 :     sub->owner = subform->subowner;
      73         196 :     sub->enabled = subform->subenabled;
      74             : 
      75             :     /* Get conninfo */
      76         196 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID,
      77             :                             tup,
      78             :                             Anum_pg_subscription_subconninfo,
      79             :                             &isnull);
      80             :     Assert(!isnull);
      81         196 :     sub->conninfo = TextDatumGetCString(datum);
      82             : 
      83             :     /* Get slotname */
      84         196 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID,
      85             :                             tup,
      86             :                             Anum_pg_subscription_subslotname,
      87             :                             &isnull);
      88         196 :     if (!isnull)
      89         188 :         sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
      90             :     else
      91           8 :         sub->slotname = NULL;
      92             : 
      93             :     /* Get synccommit */
      94         196 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID,
      95             :                             tup,
      96             :                             Anum_pg_subscription_subsynccommit,
      97             :                             &isnull);
      98             :     Assert(!isnull);
      99         196 :     sub->synccommit = TextDatumGetCString(datum);
     100             : 
     101             :     /* Get publications */
     102         196 :     datum = SysCacheGetAttr(SUBSCRIPTIONOID,
     103             :                             tup,
     104             :                             Anum_pg_subscription_subpublications,
     105             :                             &isnull);
     106             :     Assert(!isnull);
     107         196 :     sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
     108             : 
     109         196 :     ReleaseSysCache(tup);
     110             : 
     111         196 :     return sub;
     112             : }
     113             : 
     114             : /*
     115             :  * Return number of subscriptions defined in given database.
     116             :  * Used by dropdb() to check if database can indeed be dropped.
     117             :  */
     118             : int
     119          16 : CountDBSubscriptions(Oid dbid)
     120             : {
     121          16 :     int         nsubs = 0;
     122             :     Relation    rel;
     123             :     ScanKeyData scankey;
     124             :     SysScanDesc scan;
     125             :     HeapTuple   tup;
     126             : 
     127          16 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
     128             : 
     129          16 :     ScanKeyInit(&scankey,
     130             :                 Anum_pg_subscription_subdbid,
     131             :                 BTEqualStrategyNumber, F_OIDEQ,
     132             :                 ObjectIdGetDatum(dbid));
     133             : 
     134          16 :     scan = systable_beginscan(rel, InvalidOid, false,
     135             :                               NULL, 1, &scankey);
     136             : 
     137          32 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     138           0 :         nsubs++;
     139             : 
     140          16 :     systable_endscan(scan);
     141             : 
     142          16 :     table_close(rel, NoLock);
     143             : 
     144          16 :     return nsubs;
     145             : }
     146             : 
     147             : /*
     148             :  * Free memory allocated by subscription struct.
     149             :  */
     150             : void
     151           8 : FreeSubscription(Subscription *sub)
     152             : {
     153           8 :     pfree(sub->name);
     154           8 :     pfree(sub->conninfo);
     155           8 :     if (sub->slotname)
     156           8 :         pfree(sub->slotname);
     157           8 :     list_free_deep(sub->publications);
     158           8 :     pfree(sub);
     159           8 : }
     160             : 
     161             : /*
     162             :  * get_subscription_oid - given a subscription name, look up the OID
     163             :  *
     164             :  * If missing_ok is false, throw an error if name not found.  If true, just
     165             :  * return InvalidOid.
     166             :  */
     167             : Oid
     168          34 : get_subscription_oid(const char *subname, bool missing_ok)
     169             : {
     170             :     Oid         oid;
     171             : 
     172          34 :     oid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
     173             :                           MyDatabaseId, CStringGetDatum(subname));
     174          34 :     if (!OidIsValid(oid) && !missing_ok)
     175           4 :         ereport(ERROR,
     176             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     177             :                  errmsg("subscription \"%s\" does not exist", subname)));
     178          30 :     return oid;
     179             : }
     180             : 
     181             : /*
     182             :  * get_subscription_name - given a subscription OID, look up the name
     183             :  *
     184             :  * If missing_ok is false, throw an error if name not found.  If true, just
     185             :  * return NULL.
     186             :  */
     187             : char *
     188          28 : get_subscription_name(Oid subid, bool missing_ok)
     189             : {
     190             :     HeapTuple   tup;
     191             :     char       *subname;
     192             :     Form_pg_subscription subform;
     193             : 
     194          28 :     tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
     195             : 
     196          28 :     if (!HeapTupleIsValid(tup))
     197             :     {
     198           0 :         if (!missing_ok)
     199           0 :             elog(ERROR, "cache lookup failed for subscription %u", subid);
     200           0 :         return NULL;
     201             :     }
     202             : 
     203          28 :     subform = (Form_pg_subscription) GETSTRUCT(tup);
     204          28 :     subname = pstrdup(NameStr(subform->subname));
     205             : 
     206          28 :     ReleaseSysCache(tup);
     207             : 
     208          28 :     return subname;
     209             : }
     210             : 
     211             : /*
     212             :  * Convert text array to list of strings.
     213             :  *
     214             :  * Note: the resulting list of strings is pallocated here.
     215             :  */
     216             : static List *
     217         196 : textarray_to_stringlist(ArrayType *textarray)
     218             : {
     219             :     Datum      *elems;
     220             :     int         nelems,
     221             :                 i;
     222         196 :     List       *res = NIL;
     223             : 
     224         196 :     deconstruct_array(textarray,
     225             :                       TEXTOID, -1, false, 'i',
     226             :                       &elems, NULL, &nelems);
     227             : 
     228         196 :     if (nelems == 0)
     229           0 :         return NIL;
     230             : 
     231         446 :     for (i = 0; i < nelems; i++)
     232         250 :         res = lappend(res, makeString(TextDatumGetCString(elems[i])));
     233             : 
     234         196 :     return res;
     235             : }
     236             : 
     237             : /*
     238             :  * Add new state record for a subscription table.
     239             :  */
     240             : void
     241          78 : AddSubscriptionRelState(Oid subid, Oid relid, char state,
     242             :                         XLogRecPtr sublsn)
     243             : {
     244             :     Relation    rel;
     245             :     HeapTuple   tup;
     246             :     bool        nulls[Natts_pg_subscription_rel];
     247             :     Datum       values[Natts_pg_subscription_rel];
     248             : 
     249          78 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     250             : 
     251          78 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     252             : 
     253             :     /* Try finding existing mapping. */
     254          78 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     255             :                               ObjectIdGetDatum(relid),
     256             :                               ObjectIdGetDatum(subid));
     257          78 :     if (HeapTupleIsValid(tup))
     258           0 :         elog(ERROR, "subscription table %u in subscription %u already exists",
     259             :              relid, subid);
     260             : 
     261             :     /* Form the tuple. */
     262          78 :     memset(values, 0, sizeof(values));
     263          78 :     memset(nulls, false, sizeof(nulls));
     264          78 :     values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
     265          78 :     values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
     266          78 :     values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     267          78 :     if (sublsn != InvalidXLogRecPtr)
     268           0 :         values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     269             :     else
     270          78 :         nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     271             : 
     272          78 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     273             : 
     274             :     /* Insert tuple into catalog. */
     275          78 :     CatalogTupleInsert(rel, tup);
     276             : 
     277          78 :     heap_freetuple(tup);
     278             : 
     279             :     /* Cleanup. */
     280          78 :     table_close(rel, NoLock);
     281          78 : }
     282             : 
     283             : /*
     284             :  * Update the state of a subscription table.
     285             :  */
     286             : void
     287         212 : UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
     288             :                            XLogRecPtr sublsn)
     289             : {
     290             :     Relation    rel;
     291             :     HeapTuple   tup;
     292             :     bool        nulls[Natts_pg_subscription_rel];
     293             :     Datum       values[Natts_pg_subscription_rel];
     294             :     bool        replaces[Natts_pg_subscription_rel];
     295             : 
     296         212 :     LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
     297             : 
     298         212 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     299             : 
     300             :     /* Try finding existing mapping. */
     301         212 :     tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
     302             :                               ObjectIdGetDatum(relid),
     303             :                               ObjectIdGetDatum(subid));
     304         212 :     if (!HeapTupleIsValid(tup))
     305           0 :         elog(ERROR, "subscription table %u in subscription %u does not exist",
     306             :              relid, subid);
     307             : 
     308             :     /* Update the tuple. */
     309         212 :     memset(values, 0, sizeof(values));
     310         212 :     memset(nulls, false, sizeof(nulls));
     311         212 :     memset(replaces, false, sizeof(replaces));
     312             : 
     313         212 :     replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
     314         212 :     values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
     315             : 
     316         212 :     replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
     317         212 :     if (sublsn != InvalidXLogRecPtr)
     318         140 :         values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
     319             :     else
     320          72 :         nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
     321             : 
     322         212 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     323             :                             replaces);
     324             : 
     325             :     /* Update the catalog. */
     326         212 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     327             : 
     328             :     /* Cleanup. */
     329         212 :     table_close(rel, NoLock);
     330         212 : }
     331             : 
     332             : /*
     333             :  * Get state of subscription table.
     334             :  *
     335             :  * Returns SUBREL_STATE_UNKNOWN when not found and missing_ok is true.
     336             :  */
     337             : char
     338         392 : GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn,
     339             :                         bool missing_ok)
     340             : {
     341             :     Relation    rel;
     342             :     HeapTuple   tup;
     343             :     char        substate;
     344             :     bool        isnull;
     345             :     Datum       d;
     346             : 
     347         392 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     348             : 
     349             :     /* Try finding the mapping. */
     350         392 :     tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
     351             :                           ObjectIdGetDatum(relid),
     352             :                           ObjectIdGetDatum(subid));
     353             : 
     354         392 :     if (!HeapTupleIsValid(tup))
     355             :     {
     356          24 :         if (missing_ok)
     357             :         {
     358          24 :             table_close(rel, AccessShareLock);
     359          24 :             *sublsn = InvalidXLogRecPtr;
     360          24 :             return SUBREL_STATE_UNKNOWN;
     361             :         }
     362             : 
     363           0 :         elog(ERROR, "subscription table %u in subscription %u does not exist",
     364             :              relid, subid);
     365             :     }
     366             : 
     367             :     /* Get the state. */
     368         368 :     d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     369             :                         Anum_pg_subscription_rel_srsubstate, &isnull);
     370             :     Assert(!isnull);
     371         368 :     substate = DatumGetChar(d);
     372         368 :     d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
     373             :                         Anum_pg_subscription_rel_srsublsn, &isnull);
     374         368 :     if (isnull)
     375         224 :         *sublsn = InvalidXLogRecPtr;
     376             :     else
     377         144 :         *sublsn = DatumGetLSN(d);
     378             : 
     379             :     /* Cleanup */
     380         368 :     ReleaseSysCache(tup);
     381         368 :     table_close(rel, AccessShareLock);
     382             : 
     383         368 :     return substate;
     384             : }
     385             : 
     386             : /*
     387             :  * Drop subscription relation mapping. These can be for a particular
     388             :  * subscription, or for a particular relation, or both.
     389             :  */
     390             : void
     391       22290 : RemoveSubscriptionRel(Oid subid, Oid relid)
     392             : {
     393             :     Relation    rel;
     394             :     TableScanDesc scan;
     395             :     ScanKeyData skey[2];
     396             :     HeapTuple   tup;
     397       22290 :     int         nkeys = 0;
     398             : 
     399       22290 :     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
     400             : 
     401       22290 :     if (OidIsValid(subid))
     402             :     {
     403          38 :         ScanKeyInit(&skey[nkeys++],
     404             :                     Anum_pg_subscription_rel_srsubid,
     405             :                     BTEqualStrategyNumber,
     406             :                     F_OIDEQ,
     407             :                     ObjectIdGetDatum(subid));
     408             :     }
     409             : 
     410       22290 :     if (OidIsValid(relid))
     411             :     {
     412       22262 :         ScanKeyInit(&skey[nkeys++],
     413             :                     Anum_pg_subscription_rel_srrelid,
     414             :                     BTEqualStrategyNumber,
     415             :                     F_OIDEQ,
     416             :                     ObjectIdGetDatum(relid));
     417             :     }
     418             : 
     419             :     /* Do the search and delete what we found. */
     420       22290 :     scan = table_beginscan_catalog(rel, nkeys, skey);
     421       44608 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
     422             :     {
     423          28 :         CatalogTupleDelete(rel, &tup->t_self);
     424             :     }
     425       22290 :     table_endscan(scan);
     426             : 
     427       22290 :     table_close(rel, RowExclusiveLock);
     428       22290 : }
     429             : 
     430             : 
     431             : /*
     432             :  * Get all relations for subscription.
     433             :  *
     434             :  * Returned list is palloc'ed in current memory context.
     435             :  */
     436             : List *
     437           6 : GetSubscriptionRelations(Oid subid)
     438             : {
     439           6 :     List       *res = NIL;
     440             :     Relation    rel;
     441             :     HeapTuple   tup;
     442           6 :     int         nkeys = 0;
     443             :     ScanKeyData skey[2];
     444             :     SysScanDesc scan;
     445             : 
     446           6 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     447             : 
     448           6 :     ScanKeyInit(&skey[nkeys++],
     449             :                 Anum_pg_subscription_rel_srsubid,
     450             :                 BTEqualStrategyNumber, F_OIDEQ,
     451             :                 ObjectIdGetDatum(subid));
     452             : 
     453           6 :     scan = systable_beginscan(rel, InvalidOid, false,
     454             :                               NULL, nkeys, skey);
     455             : 
     456          28 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     457             :     {
     458             :         Form_pg_subscription_rel subrel;
     459             :         SubscriptionRelState *relstate;
     460             : 
     461          16 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     462             : 
     463          16 :         relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
     464          16 :         relstate->relid = subrel->srrelid;
     465          16 :         relstate->state = subrel->srsubstate;
     466          16 :         relstate->lsn = subrel->srsublsn;
     467             : 
     468          16 :         res = lappend(res, relstate);
     469             :     }
     470             : 
     471             :     /* Cleanup */
     472           6 :     systable_endscan(scan);
     473           6 :     table_close(rel, AccessShareLock);
     474             : 
     475           6 :     return res;
     476             : }
     477             : 
     478             : /*
     479             :  * Get all relations for subscription that are not in a ready state.
     480             :  *
     481             :  * Returned list is palloc'ed in current memory context.
     482             :  */
     483             : List *
     484         226 : GetSubscriptionNotReadyRelations(Oid subid)
     485             : {
     486         226 :     List       *res = NIL;
     487             :     Relation    rel;
     488             :     HeapTuple   tup;
     489         226 :     int         nkeys = 0;
     490             :     ScanKeyData skey[2];
     491             :     SysScanDesc scan;
     492             : 
     493         226 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     494             : 
     495         226 :     ScanKeyInit(&skey[nkeys++],
     496             :                 Anum_pg_subscription_rel_srsubid,
     497             :                 BTEqualStrategyNumber, F_OIDEQ,
     498             :                 ObjectIdGetDatum(subid));
     499             : 
     500         226 :     ScanKeyInit(&skey[nkeys++],
     501             :                 Anum_pg_subscription_rel_srsubstate,
     502             :                 BTEqualStrategyNumber, F_CHARNE,
     503             :                 CharGetDatum(SUBREL_STATE_READY));
     504             : 
     505         226 :     scan = systable_beginscan(rel, InvalidOid, false,
     506             :                               NULL, nkeys, skey);
     507             : 
     508        1022 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     509             :     {
     510             :         Form_pg_subscription_rel subrel;
     511             :         SubscriptionRelState *relstate;
     512             : 
     513         570 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     514             : 
     515         570 :         relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
     516         570 :         relstate->relid = subrel->srrelid;
     517         570 :         relstate->state = subrel->srsubstate;
     518         570 :         relstate->lsn = subrel->srsublsn;
     519             : 
     520         570 :         res = lappend(res, relstate);
     521             :     }
     522             : 
     523             :     /* Cleanup */
     524         226 :     systable_endscan(scan);
     525         226 :     table_close(rel, AccessShareLock);
     526             : 
     527         226 :     return res;
     528             : }

Generated by: LCOV version 1.13