LCOV - code coverage report
Current view: top level - src/backend/catalog - pg_publication.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13devel Lines: 139 151 92.1 %
Date: 2019-11-22 06:06:53 Functions: 14 14 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_publication.c
       4             :  *      publication C API manipulation
       5             :  *
       6             :  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *      pg_publication.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/genam.h"
      18             : #include "access/heapam.h"
      19             : #include "access/htup_details.h"
      20             : #include "access/tableam.h"
      21             : #include "access/xact.h"
      22             : #include "catalog/catalog.h"
      23             : #include "catalog/dependency.h"
      24             : #include "catalog/index.h"
      25             : #include "catalog/indexing.h"
      26             : #include "catalog/namespace.h"
      27             : #include "catalog/objectaccess.h"
      28             : #include "catalog/objectaddress.h"
      29             : #include "catalog/pg_publication.h"
      30             : #include "catalog/pg_publication_rel.h"
      31             : #include "catalog/pg_type.h"
      32             : #include "funcapi.h"
      33             : #include "miscadmin.h"
      34             : #include "utils/array.h"
      35             : #include "utils/builtins.h"
      36             : #include "utils/catcache.h"
      37             : #include "utils/fmgroids.h"
      38             : #include "utils/inval.h"
      39             : #include "utils/lsyscache.h"
      40             : #include "utils/rel.h"
      41             : #include "utils/syscache.h"
      42             : 
      43             : /*
      44             :  * Check if relation can be in given publication and throws appropriate
      45             :  * error if not.
      46             :  */
      47             : static void
      48          82 : check_publication_add_relation(Relation targetrel)
      49             : {
      50             :     /* Give more specific error for partitioned tables */
      51          82 :     if (RelationGetForm(targetrel)->relkind == RELKIND_PARTITIONED_TABLE)
      52           4 :         ereport(ERROR,
      53             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
      54             :                  errmsg("\"%s\" is a partitioned table",
      55             :                         RelationGetRelationName(targetrel)),
      56             :                  errdetail("Adding partitioned tables to publications is not supported."),
      57             :                  errhint("You can add the table partitions individually.")));
      58             : 
      59             :     /* Must be table */
      60          78 :     if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION)
      61           8 :         ereport(ERROR,
      62             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      63             :                  errmsg("\"%s\" is not a table",
      64             :                         RelationGetRelationName(targetrel)),
      65             :                  errdetail("Only tables can be added to publications.")));
      66             : 
      67             :     /* Can't be system table */
      68          70 :     if (IsCatalogRelation(targetrel))
      69           0 :         ereport(ERROR,
      70             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      71             :                  errmsg("\"%s\" is a system table",
      72             :                         RelationGetRelationName(targetrel)),
      73             :                  errdetail("System tables cannot be added to publications.")));
      74             : 
      75             :     /* UNLOGGED and TEMP relations cannot be part of publication. */
      76          70 :     if (!RelationNeedsWAL(targetrel))
      77           0 :         ereport(ERROR,
      78             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      79             :                  errmsg("table \"%s\" cannot be replicated",
      80             :                         RelationGetRelationName(targetrel)),
      81             :                  errdetail("Temporary and unlogged relations cannot be replicated.")));
      82          70 : }
      83             : 
      84             : /*
      85             :  * Returns if relation represented by oid and Form_pg_class entry
      86             :  * is publishable.
      87             :  *
      88             :  * Does same checks as the above, but does not need relation to be opened
      89             :  * and also does not throw errors.
      90             :  *
      91             :  * XXX  This also excludes all tables with relid < FirstNormalObjectId,
      92             :  * ie all tables created during initdb.  This mainly affects the preinstalled
      93             :  * information_schema.  IsCatalogRelationOid() only excludes tables with
      94             :  * relid < FirstBootstrapObjectId, making that test rather redundant,
      95             :  * but really we should get rid of the FirstNormalObjectId test not
      96             :  * IsCatalogRelationOid.  We can't do so today because we don't want
      97             :  * information_schema tables to be considered publishable; but this test
      98             :  * is really inadequate for that, since the information_schema could be
      99             :  * dropped and reloaded and then it'll be considered publishable.  The best
     100             :  * long-term solution may be to add a "relispublishable" bool to pg_class,
     101             :  * and depend on that instead of OID checks.
     102             :  */
     103             : static bool
     104       16662 : is_publishable_class(Oid relid, Form_pg_class reltuple)
     105             : {
     106       33000 :     return reltuple->relkind == RELKIND_RELATION &&
     107       27042 :         !IsCatalogRelationOid(relid) &&
     108       37358 :         reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
     109             :         relid >= FirstNormalObjectId;
     110             : }
     111             : 
     112             : /*
     113             :  * Another variant of this, taking a Relation.
     114             :  */
     115             : bool
     116       13502 : is_publishable_relation(Relation rel)
     117             : {
     118       13502 :     return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
     119             : }
     120             : 
     121             : 
     122             : /*
     123             :  * SQL-callable variant of the above
     124             :  *
     125             :  * This returns null when the relation does not exist.  This is intended to be
     126             :  * used for example in psql to avoid gratuitous errors when there are
     127             :  * concurrent catalog changes.
     128             :  */
     129             : Datum
     130        1088 : pg_relation_is_publishable(PG_FUNCTION_ARGS)
     131             : {
     132        1088 :     Oid         relid = PG_GETARG_OID(0);
     133             :     HeapTuple   tuple;
     134             :     bool        result;
     135             : 
     136        1088 :     tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
     137        1088 :     if (!HeapTupleIsValid(tuple))
     138           0 :         PG_RETURN_NULL();
     139        1088 :     result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
     140        1088 :     ReleaseSysCache(tuple);
     141        1088 :     PG_RETURN_BOOL(result);
     142             : }
     143             : 
     144             : 
     145             : /*
     146             :  * Insert new publication / relation mapping.
     147             :  */
     148             : ObjectAddress
     149          90 : publication_add_relation(Oid pubid, Relation targetrel,
     150             :                          bool if_not_exists)
     151             : {
     152             :     Relation    rel;
     153             :     HeapTuple   tup;
     154             :     Datum       values[Natts_pg_publication_rel];
     155             :     bool        nulls[Natts_pg_publication_rel];
     156          90 :     Oid         relid = RelationGetRelid(targetrel);
     157             :     Oid         prrelid;
     158          90 :     Publication *pub = GetPublication(pubid);
     159             :     ObjectAddress myself,
     160             :                 referenced;
     161             : 
     162          90 :     rel = table_open(PublicationRelRelationId, RowExclusiveLock);
     163             : 
     164             :     /*
     165             :      * Check for duplicates. Note that this does not really prevent
     166             :      * duplicates, it's here just to provide nicer error message in common
     167             :      * case. The real protection is the unique key on the catalog.
     168             :      */
     169          90 :     if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
     170             :                               ObjectIdGetDatum(pubid)))
     171             :     {
     172           8 :         table_close(rel, RowExclusiveLock);
     173             : 
     174           8 :         if (if_not_exists)
     175           4 :             return InvalidObjectAddress;
     176             : 
     177           4 :         ereport(ERROR,
     178             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     179             :                  errmsg("relation \"%s\" is already member of publication \"%s\"",
     180             :                         RelationGetRelationName(targetrel), pub->name)));
     181             :     }
     182             : 
     183          82 :     check_publication_add_relation(targetrel);
     184             : 
     185             :     /* Form a tuple. */
     186          70 :     memset(values, 0, sizeof(values));
     187          70 :     memset(nulls, false, sizeof(nulls));
     188             : 
     189          70 :     prrelid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
     190             :                                  Anum_pg_publication_rel_oid);
     191          70 :     values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(prrelid);
     192          70 :     values[Anum_pg_publication_rel_prpubid - 1] =
     193          70 :         ObjectIdGetDatum(pubid);
     194          70 :     values[Anum_pg_publication_rel_prrelid - 1] =
     195          70 :         ObjectIdGetDatum(relid);
     196             : 
     197          70 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     198             : 
     199             :     /* Insert tuple into catalog. */
     200          70 :     CatalogTupleInsert(rel, tup);
     201          70 :     heap_freetuple(tup);
     202             : 
     203          70 :     ObjectAddressSet(myself, PublicationRelRelationId, prrelid);
     204             : 
     205             :     /* Add dependency on the publication */
     206          70 :     ObjectAddressSet(referenced, PublicationRelationId, pubid);
     207          70 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     208             : 
     209             :     /* Add dependency on the relation */
     210          70 :     ObjectAddressSet(referenced, RelationRelationId, relid);
     211          70 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     212             : 
     213             :     /* Close the table. */
     214          70 :     table_close(rel, RowExclusiveLock);
     215             : 
     216             :     /* Invalidate relcache so that publication info is rebuilt. */
     217          70 :     CacheInvalidateRelcache(targetrel);
     218             : 
     219          70 :     return myself;
     220             : }
     221             : 
     222             : 
     223             : /*
     224             :  * Gets list of publication oids for a relation oid.
     225             :  */
     226             : List *
     227        2440 : GetRelationPublications(Oid relid)
     228             : {
     229        2440 :     List       *result = NIL;
     230             :     CatCList   *pubrellist;
     231             :     int         i;
     232             : 
     233             :     /* Find all publications associated with the relation. */
     234        2440 :     pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
     235             :                                      ObjectIdGetDatum(relid));
     236        2478 :     for (i = 0; i < pubrellist->n_members; i++)
     237             :     {
     238          38 :         HeapTuple   tup = &pubrellist->members[i]->tuple;
     239          38 :         Oid         pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
     240             : 
     241          38 :         result = lappend_oid(result, pubid);
     242             :     }
     243             : 
     244        2440 :     ReleaseSysCacheList(pubrellist);
     245             : 
     246        2440 :     return result;
     247             : }
     248             : 
     249             : /*
     250             :  * Gets list of relation oids for a publication.
     251             :  *
     252             :  * This should only be used for normal publications, the FOR ALL TABLES
     253             :  * should use GetAllTablesPublicationRelations().
     254             :  */
     255             : List *
     256          30 : GetPublicationRelations(Oid pubid)
     257             : {
     258             :     List       *result;
     259             :     Relation    pubrelsrel;
     260             :     ScanKeyData scankey;
     261             :     SysScanDesc scan;
     262             :     HeapTuple   tup;
     263             : 
     264             :     /* Find all publications associated with the relation. */
     265          30 :     pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
     266             : 
     267          30 :     ScanKeyInit(&scankey,
     268             :                 Anum_pg_publication_rel_prpubid,
     269             :                 BTEqualStrategyNumber, F_OIDEQ,
     270             :                 ObjectIdGetDatum(pubid));
     271             : 
     272          30 :     scan = systable_beginscan(pubrelsrel, PublicationRelPrrelidPrpubidIndexId,
     273             :                               true, NULL, 1, &scankey);
     274             : 
     275          30 :     result = NIL;
     276          94 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     277             :     {
     278             :         Form_pg_publication_rel pubrel;
     279             : 
     280          34 :         pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
     281             : 
     282          34 :         result = lappend_oid(result, pubrel->prrelid);
     283             :     }
     284             : 
     285          30 :     systable_endscan(scan);
     286          30 :     table_close(pubrelsrel, AccessShareLock);
     287             : 
     288          30 :     return result;
     289             : }
     290             : 
     291             : /*
     292             :  * Gets list of publication oids for publications marked as FOR ALL TABLES.
     293             :  */
     294             : List *
     295        2342 : GetAllTablesPublications(void)
     296             : {
     297             :     List       *result;
     298             :     Relation    rel;
     299             :     ScanKeyData scankey;
     300             :     SysScanDesc scan;
     301             :     HeapTuple   tup;
     302             : 
     303             :     /* Find all publications that are marked as for all tables. */
     304        2342 :     rel = table_open(PublicationRelationId, AccessShareLock);
     305             : 
     306        2342 :     ScanKeyInit(&scankey,
     307             :                 Anum_pg_publication_puballtables,
     308             :                 BTEqualStrategyNumber, F_BOOLEQ,
     309             :                 BoolGetDatum(true));
     310             : 
     311        2342 :     scan = systable_beginscan(rel, InvalidOid, false,
     312             :                               NULL, 1, &scankey);
     313             : 
     314        2342 :     result = NIL;
     315        4684 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     316             :     {
     317           0 :         Oid         oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
     318             : 
     319           0 :         result = lappend_oid(result, oid);
     320             :     }
     321             : 
     322        2342 :     systable_endscan(scan);
     323        2342 :     table_close(rel, AccessShareLock);
     324             : 
     325        2342 :     return result;
     326             : }
     327             : 
     328             : /*
     329             :  * Gets list of all relation published by FOR ALL TABLES publication(s).
     330             :  */
     331             : List *
     332          30 : GetAllTablesPublicationRelations(void)
     333             : {
     334             :     Relation    classRel;
     335             :     ScanKeyData key[1];
     336             :     TableScanDesc scan;
     337             :     HeapTuple   tuple;
     338          30 :     List       *result = NIL;
     339             : 
     340          30 :     classRel = table_open(RelationRelationId, AccessShareLock);
     341             : 
     342          30 :     ScanKeyInit(&key[0],
     343             :                 Anum_pg_class_relkind,
     344             :                 BTEqualStrategyNumber, F_CHAREQ,
     345             :                 CharGetDatum(RELKIND_RELATION));
     346             : 
     347          30 :     scan = table_beginscan_catalog(classRel, 1, key);
     348             : 
     349        2132 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
     350             :     {
     351        2072 :         Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
     352        2072 :         Oid         relid = relForm->oid;
     353             : 
     354        2072 :         if (is_publishable_class(relid, relForm))
     355          62 :             result = lappend_oid(result, relid);
     356             :     }
     357             : 
     358          30 :     table_endscan(scan);
     359          30 :     table_close(classRel, AccessShareLock);
     360             : 
     361          30 :     return result;
     362             : }
     363             : 
     364             : /*
     365             :  * Get publication using oid
     366             :  *
     367             :  * The Publication struct and its data are palloc'ed here.
     368             :  */
     369             : Publication *
     370         174 : GetPublication(Oid pubid)
     371             : {
     372             :     HeapTuple   tup;
     373             :     Publication *pub;
     374             :     Form_pg_publication pubform;
     375             : 
     376         174 :     tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
     377             : 
     378         174 :     if (!HeapTupleIsValid(tup))
     379           0 :         elog(ERROR, "cache lookup failed for publication %u", pubid);
     380             : 
     381         174 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
     382             : 
     383         174 :     pub = (Publication *) palloc(sizeof(Publication));
     384         174 :     pub->oid = pubid;
     385         174 :     pub->name = pstrdup(NameStr(pubform->pubname));
     386         174 :     pub->alltables = pubform->puballtables;
     387         174 :     pub->pubactions.pubinsert = pubform->pubinsert;
     388         174 :     pub->pubactions.pubupdate = pubform->pubupdate;
     389         174 :     pub->pubactions.pubdelete = pubform->pubdelete;
     390         174 :     pub->pubactions.pubtruncate = pubform->pubtruncate;
     391             : 
     392         174 :     ReleaseSysCache(tup);
     393             : 
     394         174 :     return pub;
     395             : }
     396             : 
     397             : 
     398             : /*
     399             :  * Get Publication using name.
     400             :  */
     401             : Publication *
     402          84 : GetPublicationByName(const char *pubname, bool missing_ok)
     403             : {
     404             :     Oid         oid;
     405             : 
     406          84 :     oid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
     407             :                           CStringGetDatum(pubname));
     408          84 :     if (!OidIsValid(oid))
     409             :     {
     410           0 :         if (missing_ok)
     411           0 :             return NULL;
     412             : 
     413           0 :         ereport(ERROR,
     414             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     415             :                  errmsg("publication \"%s\" does not exist", pubname)));
     416             :     }
     417             : 
     418          84 :     return GetPublication(oid);
     419             : }
     420             : 
     421             : /*
     422             :  * get_publication_oid - given a publication name, look up the OID
     423             :  *
     424             :  * If missing_ok is false, throw an error if name not found.  If true, just
     425             :  * return InvalidOid.
     426             :  */
     427             : Oid
     428          70 : get_publication_oid(const char *pubname, bool missing_ok)
     429             : {
     430             :     Oid         oid;
     431             : 
     432          70 :     oid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
     433             :                           CStringGetDatum(pubname));
     434          70 :     if (!OidIsValid(oid) && !missing_ok)
     435           4 :         ereport(ERROR,
     436             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     437             :                  errmsg("publication \"%s\" does not exist", pubname)));
     438          66 :     return oid;
     439             : }
     440             : 
     441             : /*
     442             :  * get_publication_name - given a publication Oid, look up the name
     443             :  *
     444             :  * If missing_ok is false, throw an error if name not found.  If true, just
     445             :  * return NULL.
     446             :  */
     447             : char *
     448          94 : get_publication_name(Oid pubid, bool missing_ok)
     449             : {
     450             :     HeapTuple   tup;
     451             :     char       *pubname;
     452             :     Form_pg_publication pubform;
     453             : 
     454          94 :     tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
     455             : 
     456          94 :     if (!HeapTupleIsValid(tup))
     457             :     {
     458           0 :         if (!missing_ok)
     459           0 :             elog(ERROR, "cache lookup failed for publication %u", pubid);
     460           0 :         return NULL;
     461             :     }
     462             : 
     463          94 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
     464          94 :     pubname = pstrdup(NameStr(pubform->pubname));
     465             : 
     466          94 :     ReleaseSysCache(tup);
     467             : 
     468          94 :     return pubname;
     469             : }
     470             : 
     471             : /*
     472             :  * Returns Oids of tables in a publication.
     473             :  */
     474             : Datum
     475         132 : pg_get_publication_tables(PG_FUNCTION_ARGS)
     476             : {
     477             :     FuncCallContext *funcctx;
     478         132 :     char       *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     479             :     Publication *publication;
     480             :     List       *tables;
     481             : 
     482             :     /* stuff done only on the first call of the function */
     483         132 :     if (SRF_IS_FIRSTCALL())
     484             :     {
     485             :         MemoryContext oldcontext;
     486             : 
     487             :         /* create a function context for cross-call persistence */
     488          44 :         funcctx = SRF_FIRSTCALL_INIT();
     489             : 
     490             :         /* switch to memory context appropriate for multiple function calls */
     491          44 :         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
     492             : 
     493          44 :         publication = GetPublicationByName(pubname, false);
     494          44 :         if (publication->alltables)
     495          30 :             tables = GetAllTablesPublicationRelations();
     496             :         else
     497          14 :             tables = GetPublicationRelations(publication->oid);
     498          44 :         funcctx->user_fctx = (void *) tables;
     499             : 
     500          44 :         MemoryContextSwitchTo(oldcontext);
     501             :     }
     502             : 
     503             :     /* stuff done on every call of the function */
     504         132 :     funcctx = SRF_PERCALL_SETUP();
     505         132 :     tables = (List *) funcctx->user_fctx;
     506             : 
     507         132 :     if (funcctx->call_cntr < list_length(tables))
     508             :     {
     509          88 :         Oid         relid = list_nth_oid(tables, funcctx->call_cntr);
     510             : 
     511          88 :         SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
     512             :     }
     513             : 
     514          44 :     SRF_RETURN_DONE(funcctx);
     515             : }

Generated by: LCOV version 1.13