LCOV - code coverage report
Current view: top level - src/backend/catalog - pg_publication.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13devel Lines: 139 151 92.1 %
Date: 2019-09-19 02:07:14 Functions: 14 14 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_publication.c
       4             :  *      publication C API manipulation
       5             :  *
       6             :  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *      pg_publication.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "funcapi.h"
      18             : #include "miscadmin.h"
      19             : 
      20             : #include "access/genam.h"
      21             : #include "access/heapam.h"
      22             : #include "access/htup_details.h"
      23             : #include "access/tableam.h"
      24             : #include "access/xact.h"
      25             : 
      26             : #include "catalog/catalog.h"
      27             : #include "catalog/dependency.h"
      28             : #include "catalog/index.h"
      29             : #include "catalog/indexing.h"
      30             : #include "catalog/namespace.h"
      31             : #include "catalog/objectaccess.h"
      32             : #include "catalog/objectaddress.h"
      33             : #include "catalog/pg_type.h"
      34             : #include "catalog/pg_publication.h"
      35             : #include "catalog/pg_publication_rel.h"
      36             : 
      37             : #include "utils/array.h"
      38             : #include "utils/builtins.h"
      39             : #include "utils/catcache.h"
      40             : #include "utils/fmgroids.h"
      41             : #include "utils/inval.h"
      42             : #include "utils/lsyscache.h"
      43             : #include "utils/rel.h"
      44             : #include "utils/syscache.h"
      45             : 
      46             : /*
      47             :  * Check if relation can be in given publication and throws appropriate
      48             :  * error if not.
      49             :  */
      50             : static void
      51          84 : check_publication_add_relation(Relation targetrel)
      52             : {
      53             :     /* Give more specific error for partitioned tables */
      54          84 :     if (RelationGetForm(targetrel)->relkind == RELKIND_PARTITIONED_TABLE)
      55           4 :         ereport(ERROR,
      56             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
      57             :                  errmsg("\"%s\" is a partitioned table",
      58             :                         RelationGetRelationName(targetrel)),
      59             :                  errdetail("Adding partitioned tables to publications is not supported."),
      60             :                  errhint("You can add the table partitions individually.")));
      61             : 
      62             :     /* Must be table */
      63          80 :     if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION)
      64           8 :         ereport(ERROR,
      65             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      66             :                  errmsg("\"%s\" is not a table",
      67             :                         RelationGetRelationName(targetrel)),
      68             :                  errdetail("Only tables can be added to publications.")));
      69             : 
      70             :     /* Can't be system table */
      71          72 :     if (IsCatalogRelation(targetrel))
      72           0 :         ereport(ERROR,
      73             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      74             :                  errmsg("\"%s\" is a system table",
      75             :                         RelationGetRelationName(targetrel)),
      76             :                  errdetail("System tables cannot be added to publications.")));
      77             : 
      78             :     /* UNLOGGED and TEMP relations cannot be part of publication. */
      79          72 :     if (!RelationNeedsWAL(targetrel))
      80           0 :         ereport(ERROR,
      81             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      82             :                  errmsg("table \"%s\" cannot be replicated",
      83             :                         RelationGetRelationName(targetrel)),
      84             :                  errdetail("Temporary and unlogged relations cannot be replicated.")));
      85          72 : }
      86             : 
      87             : /*
      88             :  * Returns if relation represented by oid and Form_pg_class entry
      89             :  * is publishable.
      90             :  *
      91             :  * Does same checks as the above, but does not need relation to be opened
      92             :  * and also does not throw errors.
      93             :  *
      94             :  * XXX  This also excludes all tables with relid < FirstNormalObjectId,
      95             :  * ie all tables created during initdb.  This mainly affects the preinstalled
      96             :  * information_schema.  IsCatalogRelationOid() only excludes tables with
      97             :  * relid < FirstBootstrapObjectId, making that test rather redundant,
      98             :  * but really we should get rid of the FirstNormalObjectId test not
      99             :  * IsCatalogRelationOid.  We can't do so today because we don't want
     100             :  * information_schema tables to be considered publishable; but this test
     101             :  * is really inadequate for that, since the information_schema could be
     102             :  * dropped and reloaded and then it'll be considered publishable.  The best
     103             :  * long-term solution may be to add a "relispublishable" bool to pg_class,
     104             :  * and depend on that instead of OID checks.
     105             :  */
     106             : static bool
     107       16396 : is_publishable_class(Oid relid, Form_pg_class reltuple)
     108             : {
     109       32484 :     return reltuple->relkind == RELKIND_RELATION &&
     110       26802 :         !IsCatalogRelationOid(relid) &&
     111       37112 :         reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
     112             :         relid >= FirstNormalObjectId;
     113             : }
     114             : 
     115             : /*
     116             :  * Another variant of this, taking a Relation.
     117             :  */
     118             : bool
     119       13476 : is_publishable_relation(Relation rel)
     120             : {
     121       13476 :     return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
     122             : }
     123             : 
     124             : 
     125             : /*
     126             :  * SQL-callable variant of the above
     127             :  *
     128             :  * This returns null when the relation does not exist.  This is intended to be
     129             :  * used for example in psql to avoid gratuitous errors when there are
     130             :  * concurrent catalog changes.
     131             :  */
     132             : Datum
     133        1044 : pg_relation_is_publishable(PG_FUNCTION_ARGS)
     134             : {
     135        1044 :     Oid         relid = PG_GETARG_OID(0);
     136             :     HeapTuple   tuple;
     137             :     bool        result;
     138             : 
     139        1044 :     tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
     140        1044 :     if (!HeapTupleIsValid(tuple))
     141           0 :         PG_RETURN_NULL();
     142        1044 :     result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
     143        1044 :     ReleaseSysCache(tuple);
     144        1044 :     PG_RETURN_BOOL(result);
     145             : }
     146             : 
     147             : 
     148             : /*
     149             :  * Insert new publication / relation mapping.
     150             :  */
     151             : ObjectAddress
     152          92 : publication_add_relation(Oid pubid, Relation targetrel,
     153             :                          bool if_not_exists)
     154             : {
     155             :     Relation    rel;
     156             :     HeapTuple   tup;
     157             :     Datum       values[Natts_pg_publication_rel];
     158             :     bool        nulls[Natts_pg_publication_rel];
     159          92 :     Oid         relid = RelationGetRelid(targetrel);
     160             :     Oid         prrelid;
     161          92 :     Publication *pub = GetPublication(pubid);
     162             :     ObjectAddress myself,
     163             :                 referenced;
     164             : 
     165          92 :     rel = table_open(PublicationRelRelationId, RowExclusiveLock);
     166             : 
     167             :     /*
     168             :      * Check for duplicates. Note that this does not really prevent
     169             :      * duplicates, it's here just to provide nicer error message in common
     170             :      * case. The real protection is the unique key on the catalog.
     171             :      */
     172          92 :     if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
     173             :                               ObjectIdGetDatum(pubid)))
     174             :     {
     175           8 :         table_close(rel, RowExclusiveLock);
     176             : 
     177           8 :         if (if_not_exists)
     178           4 :             return InvalidObjectAddress;
     179             : 
     180           4 :         ereport(ERROR,
     181             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     182             :                  errmsg("relation \"%s\" is already member of publication \"%s\"",
     183             :                         RelationGetRelationName(targetrel), pub->name)));
     184             :     }
     185             : 
     186          84 :     check_publication_add_relation(targetrel);
     187             : 
     188             :     /* Form a tuple. */
     189          72 :     memset(values, 0, sizeof(values));
     190          72 :     memset(nulls, false, sizeof(nulls));
     191             : 
     192          72 :     prrelid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
     193             :                                  Anum_pg_publication_rel_oid);
     194          72 :     values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(prrelid);
     195          72 :     values[Anum_pg_publication_rel_prpubid - 1] =
     196          72 :         ObjectIdGetDatum(pubid);
     197          72 :     values[Anum_pg_publication_rel_prrelid - 1] =
     198          72 :         ObjectIdGetDatum(relid);
     199             : 
     200          72 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     201             : 
     202             :     /* Insert tuple into catalog. */
     203          72 :     CatalogTupleInsert(rel, tup);
     204          72 :     heap_freetuple(tup);
     205             : 
     206          72 :     ObjectAddressSet(myself, PublicationRelRelationId, prrelid);
     207             : 
     208             :     /* Add dependency on the publication */
     209          72 :     ObjectAddressSet(referenced, PublicationRelationId, pubid);
     210          72 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     211             : 
     212             :     /* Add dependency on the relation */
     213          72 :     ObjectAddressSet(referenced, RelationRelationId, relid);
     214          72 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     215             : 
     216             :     /* Close the table. */
     217          72 :     table_close(rel, RowExclusiveLock);
     218             : 
     219             :     /* Invalidate relcache so that publication info is rebuilt. */
     220          72 :     CacheInvalidateRelcache(targetrel);
     221             : 
     222          72 :     return myself;
     223             : }
     224             : 
     225             : 
     226             : /*
     227             :  * Gets list of publication oids for a relation oid.
     228             :  */
     229             : List *
     230        2426 : GetRelationPublications(Oid relid)
     231             : {
     232        2426 :     List       *result = NIL;
     233             :     CatCList   *pubrellist;
     234             :     int         i;
     235             : 
     236             :     /* Find all publications associated with the relation. */
     237        2426 :     pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
     238             :                                      ObjectIdGetDatum(relid));
     239        2466 :     for (i = 0; i < pubrellist->n_members; i++)
     240             :     {
     241          40 :         HeapTuple   tup = &pubrellist->members[i]->tuple;
     242          40 :         Oid         pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
     243             : 
     244          40 :         result = lappend_oid(result, pubid);
     245             :     }
     246             : 
     247        2426 :     ReleaseSysCacheList(pubrellist);
     248             : 
     249        2426 :     return result;
     250             : }
     251             : 
     252             : /*
     253             :  * Gets list of relation oids for a publication.
     254             :  *
     255             :  * This should only be used for normal publications, the FOR ALL TABLES
     256             :  * should use GetAllTablesPublicationRelations().
     257             :  */
     258             : List *
     259          32 : GetPublicationRelations(Oid pubid)
     260             : {
     261             :     List       *result;
     262             :     Relation    pubrelsrel;
     263             :     ScanKeyData scankey;
     264             :     SysScanDesc scan;
     265             :     HeapTuple   tup;
     266             : 
     267             :     /* Find all publications associated with the relation. */
     268          32 :     pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
     269             : 
     270          32 :     ScanKeyInit(&scankey,
     271             :                 Anum_pg_publication_rel_prpubid,
     272             :                 BTEqualStrategyNumber, F_OIDEQ,
     273             :                 ObjectIdGetDatum(pubid));
     274             : 
     275          32 :     scan = systable_beginscan(pubrelsrel, PublicationRelPrrelidPrpubidIndexId,
     276             :                               true, NULL, 1, &scankey);
     277             : 
     278          32 :     result = NIL;
     279         100 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     280             :     {
     281             :         Form_pg_publication_rel pubrel;
     282             : 
     283          36 :         pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
     284             : 
     285          36 :         result = lappend_oid(result, pubrel->prrelid);
     286             :     }
     287             : 
     288          32 :     systable_endscan(scan);
     289          32 :     table_close(pubrelsrel, AccessShareLock);
     290             : 
     291          32 :     return result;
     292             : }
     293             : 
     294             : /*
     295             :  * Gets list of publication oids for publications marked as FOR ALL TABLES.
     296             :  */
     297             : List *
     298        2330 : GetAllTablesPublications(void)
     299             : {
     300             :     List       *result;
     301             :     Relation    rel;
     302             :     ScanKeyData scankey;
     303             :     SysScanDesc scan;
     304             :     HeapTuple   tup;
     305             : 
     306             :     /* Find all publications that are marked as for all tables. */
     307        2330 :     rel = table_open(PublicationRelationId, AccessShareLock);
     308             : 
     309        2330 :     ScanKeyInit(&scankey,
     310             :                 Anum_pg_publication_puballtables,
     311             :                 BTEqualStrategyNumber, F_BOOLEQ,
     312             :                 BoolGetDatum(true));
     313             : 
     314        2330 :     scan = systable_beginscan(rel, InvalidOid, false,
     315             :                               NULL, 1, &scankey);
     316             : 
     317        2330 :     result = NIL;
     318        4660 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     319             :     {
     320           0 :         Oid         oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
     321             : 
     322           0 :         result = lappend_oid(result, oid);
     323             :     }
     324             : 
     325        2330 :     systable_endscan(scan);
     326        2330 :     table_close(rel, AccessShareLock);
     327             : 
     328        2330 :     return result;
     329             : }
     330             : 
     331             : /*
     332             :  * Gets list of all relation published by FOR ALL TABLES publication(s).
     333             :  */
     334             : List *
     335          26 : GetAllTablesPublicationRelations(void)
     336             : {
     337             :     Relation    classRel;
     338             :     ScanKeyData key[1];
     339             :     TableScanDesc scan;
     340             :     HeapTuple   tuple;
     341          26 :     List       *result = NIL;
     342             : 
     343          26 :     classRel = table_open(RelationRelationId, AccessShareLock);
     344             : 
     345          26 :     ScanKeyInit(&key[0],
     346             :                 Anum_pg_class_relkind,
     347             :                 BTEqualStrategyNumber, F_CHAREQ,
     348             :                 CharGetDatum(RELKIND_RELATION));
     349             : 
     350          26 :     scan = table_beginscan_catalog(classRel, 1, key);
     351             : 
     352        1928 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
     353             :     {
     354        1876 :         Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
     355        1876 :         Oid         relid = relForm->oid;
     356             : 
     357        1876 :         if (is_publishable_class(relid, relForm))
     358          56 :             result = lappend_oid(result, relid);
     359             :     }
     360             : 
     361          26 :     table_endscan(scan);
     362          26 :     table_close(classRel, AccessShareLock);
     363             : 
     364          26 :     return result;
     365             : }
     366             : 
     367             : /*
     368             :  * Get publication using oid
     369             :  *
     370             :  * The Publication struct and its data are palloc'ed here.
     371             :  */
     372             : Publication *
     373         174 : GetPublication(Oid pubid)
     374             : {
     375             :     HeapTuple   tup;
     376             :     Publication *pub;
     377             :     Form_pg_publication pubform;
     378             : 
     379         174 :     tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
     380             : 
     381         174 :     if (!HeapTupleIsValid(tup))
     382           0 :         elog(ERROR, "cache lookup failed for publication %u", pubid);
     383             : 
     384         174 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
     385             : 
     386         174 :     pub = (Publication *) palloc(sizeof(Publication));
     387         174 :     pub->oid = pubid;
     388         174 :     pub->name = pstrdup(NameStr(pubform->pubname));
     389         174 :     pub->alltables = pubform->puballtables;
     390         174 :     pub->pubactions.pubinsert = pubform->pubinsert;
     391         174 :     pub->pubactions.pubupdate = pubform->pubupdate;
     392         174 :     pub->pubactions.pubdelete = pubform->pubdelete;
     393         174 :     pub->pubactions.pubtruncate = pubform->pubtruncate;
     394             : 
     395         174 :     ReleaseSysCache(tup);
     396             : 
     397         174 :     return pub;
     398             : }
     399             : 
     400             : 
     401             : /*
     402             :  * Get Publication using name.
     403             :  */
     404             : Publication *
     405          82 : GetPublicationByName(const char *pubname, bool missing_ok)
     406             : {
     407             :     Oid         oid;
     408             : 
     409          82 :     oid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
     410             :                           CStringGetDatum(pubname));
     411          82 :     if (!OidIsValid(oid))
     412             :     {
     413           0 :         if (missing_ok)
     414           0 :             return NULL;
     415             : 
     416           0 :         ereport(ERROR,
     417             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     418             :                  errmsg("publication \"%s\" does not exist", pubname)));
     419             :     }
     420             : 
     421          82 :     return GetPublication(oid);
     422             : }
     423             : 
     424             : /*
     425             :  * get_publication_oid - given a publication name, look up the OID
     426             :  *
     427             :  * If missing_ok is false, throw an error if name not found.  If true, just
     428             :  * return InvalidOid.
     429             :  */
     430             : Oid
     431          68 : get_publication_oid(const char *pubname, bool missing_ok)
     432             : {
     433             :     Oid         oid;
     434             : 
     435          68 :     oid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
     436             :                           CStringGetDatum(pubname));
     437          68 :     if (!OidIsValid(oid) && !missing_ok)
     438           4 :         ereport(ERROR,
     439             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     440             :                  errmsg("publication \"%s\" does not exist", pubname)));
     441          64 :     return oid;
     442             : }
     443             : 
     444             : /*
     445             :  * get_publication_name - given a publication Oid, look up the name
     446             :  *
     447             :  * If missing_ok is false, throw an error if name not found.  If true, just
     448             :  * return NULL.
     449             :  */
     450             : char *
     451          94 : get_publication_name(Oid pubid, bool missing_ok)
     452             : {
     453             :     HeapTuple   tup;
     454             :     char       *pubname;
     455             :     Form_pg_publication pubform;
     456             : 
     457          94 :     tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
     458             : 
     459          94 :     if (!HeapTupleIsValid(tup))
     460             :     {
     461           0 :         if (!missing_ok)
     462           0 :             elog(ERROR, "cache lookup failed for publication %u", pubid);
     463           0 :         return NULL;
     464             :     }
     465             : 
     466          94 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
     467          94 :     pubname = pstrdup(NameStr(pubform->pubname));
     468             : 
     469          94 :     ReleaseSysCache(tup);
     470             : 
     471          94 :     return pubname;
     472             : }
     473             : 
     474             : /*
     475             :  * Returns Oids of tables in a publication.
     476             :  */
     477             : Datum
     478         126 : pg_get_publication_tables(PG_FUNCTION_ARGS)
     479             : {
     480             :     FuncCallContext *funcctx;
     481         126 :     char       *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     482             :     Publication *publication;
     483             :     List       *tables;
     484             : 
     485             :     /* stuff done only on the first call of the function */
     486         126 :     if (SRF_IS_FIRSTCALL())
     487             :     {
     488             :         MemoryContext oldcontext;
     489             : 
     490             :         /* create a function context for cross-call persistence */
     491          42 :         funcctx = SRF_FIRSTCALL_INIT();
     492             : 
     493             :         /* switch to memory context appropriate for multiple function calls */
     494          42 :         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
     495             : 
     496          42 :         publication = GetPublicationByName(pubname, false);
     497          42 :         if (publication->alltables)
     498          26 :             tables = GetAllTablesPublicationRelations();
     499             :         else
     500          16 :             tables = GetPublicationRelations(publication->oid);
     501          42 :         funcctx->user_fctx = (void *) tables;
     502             : 
     503          42 :         MemoryContextSwitchTo(oldcontext);
     504             :     }
     505             : 
     506             :     /* stuff done on every call of the function */
     507         126 :     funcctx = SRF_PERCALL_SETUP();
     508         126 :     tables = (List *) funcctx->user_fctx;
     509             : 
     510         126 :     if (funcctx->call_cntr < list_length(tables))
     511             :     {
     512          84 :         Oid         relid = list_nth_oid(tables, funcctx->call_cntr);
     513             : 
     514          84 :         SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
     515             :     }
     516             : 
     517          42 :     SRF_RETURN_DONE(funcctx);
     518             : }

Generated by: LCOV version 1.13