LCOV - code coverage report
Current view: top level - src/backend/catalog - pg_publication.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 162 171 94.7 %
Date: 2020-06-05 18:07:03 Functions: 14 14 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_publication.c
       4             :  *      publication C API manipulation
       5             :  *
       6             :  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *      pg_publication.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/genam.h"
      18             : #include "access/heapam.h"
      19             : #include "access/htup_details.h"
      20             : #include "access/tableam.h"
      21             : #include "access/xact.h"
      22             : #include "catalog/catalog.h"
      23             : #include "catalog/dependency.h"
      24             : #include "catalog/index.h"
      25             : #include "catalog/indexing.h"
      26             : #include "catalog/namespace.h"
      27             : #include "catalog/partition.h"
      28             : #include "catalog/objectaccess.h"
      29             : #include "catalog/objectaddress.h"
      30             : #include "catalog/pg_inherits.h"
      31             : #include "catalog/pg_publication.h"
      32             : #include "catalog/pg_publication_rel.h"
      33             : #include "catalog/pg_type.h"
      34             : #include "funcapi.h"
      35             : #include "miscadmin.h"
      36             : #include "utils/array.h"
      37             : #include "utils/builtins.h"
      38             : #include "utils/catcache.h"
      39             : #include "utils/fmgroids.h"
      40             : #include "utils/inval.h"
      41             : #include "utils/lsyscache.h"
      42             : #include "utils/rel.h"
      43             : #include "utils/syscache.h"
      44             : 
      45             : /*
      46             :  * Check if relation can be in given publication and throws appropriate
      47             :  * error if not.
      48             :  */
      49             : static void
      50          94 : check_publication_add_relation(Relation targetrel)
      51             : {
      52             :     /* Must be a regular or partitioned table */
      53          94 :     if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
      54          16 :         RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
      55           8 :         ereport(ERROR,
      56             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      57             :                  errmsg("\"%s\" is not a table",
      58             :                         RelationGetRelationName(targetrel)),
      59             :                  errdetail("Only tables can be added to publications.")));
      60             : 
      61             :     /* Can't be system table */
      62          86 :     if (IsCatalogRelation(targetrel))
      63           0 :         ereport(ERROR,
      64             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      65             :                  errmsg("\"%s\" is a system table",
      66             :                         RelationGetRelationName(targetrel)),
      67             :                  errdetail("System tables cannot be added to publications.")));
      68             : 
      69             :     /* UNLOGGED and TEMP relations cannot be part of publication. */
      70          86 :     if (!RelationNeedsWAL(targetrel))
      71           0 :         ereport(ERROR,
      72             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      73             :                  errmsg("table \"%s\" cannot be replicated",
      74             :                         RelationGetRelationName(targetrel)),
      75             :                  errdetail("Temporary and unlogged relations cannot be replicated.")));
      76          86 : }
      77             : 
      78             : /*
      79             :  * Returns if relation represented by oid and Form_pg_class entry
      80             :  * is publishable.
      81             :  *
      82             :  * Does same checks as the above, but does not need relation to be opened
      83             :  * and also does not throw errors.
      84             :  *
      85             :  * XXX  This also excludes all tables with relid < FirstNormalObjectId,
      86             :  * ie all tables created during initdb.  This mainly affects the preinstalled
      87             :  * information_schema.  IsCatalogRelationOid() only excludes tables with
      88             :  * relid < FirstBootstrapObjectId, making that test rather redundant,
      89             :  * but really we should get rid of the FirstNormalObjectId test not
      90             :  * IsCatalogRelationOid.  We can't do so today because we don't want
      91             :  * information_schema tables to be considered publishable; but this test
      92             :  * is really inadequate for that, since the information_schema could be
      93             :  * dropped and reloaded and then it'll be considered publishable.  The best
      94             :  * long-term solution may be to add a "relispublishable" bool to pg_class,
      95             :  * and depend on that instead of OID checks.
      96             :  */
      97             : static bool
      98       23122 : is_publishable_class(Oid relid, Form_pg_class reltuple)
      99             : {
     100       23454 :     return (reltuple->relkind == RELKIND_RELATION ||
     101         332 :             reltuple->relkind == RELKIND_PARTITIONED_TABLE) &&
     102       22950 :         !IsCatalogRelationOid(relid) &&
     103       46244 :         reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
     104             :         relid >= FirstNormalObjectId;
     105             : }
     106             : 
     107             : /*
     108             :  * Another variant of this, taking a Relation.
     109             :  */
     110             : bool
     111       19596 : is_publishable_relation(Relation rel)
     112             : {
     113       19596 :     return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
     114             : }
     115             : 
     116             : 
     117             : /*
     118             :  * SQL-callable variant of the above
     119             :  *
     120             :  * This returns null when the relation does not exist.  This is intended to be
     121             :  * used for example in psql to avoid gratuitous errors when there are
     122             :  * concurrent catalog changes.
     123             :  */
     124             : Datum
     125        1196 : pg_relation_is_publishable(PG_FUNCTION_ARGS)
     126             : {
     127        1196 :     Oid         relid = PG_GETARG_OID(0);
     128             :     HeapTuple   tuple;
     129             :     bool        result;
     130             : 
     131        1196 :     tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
     132        1196 :     if (!HeapTupleIsValid(tuple))
     133           0 :         PG_RETURN_NULL();
     134        1196 :     result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
     135        1196 :     ReleaseSysCache(tuple);
     136        1196 :     PG_RETURN_BOOL(result);
     137             : }
     138             : 
     139             : 
     140             : /*
     141             :  * Insert new publication / relation mapping.
     142             :  */
     143             : ObjectAddress
     144         102 : publication_add_relation(Oid pubid, Relation targetrel,
     145             :                          bool if_not_exists)
     146             : {
     147             :     Relation    rel;
     148             :     HeapTuple   tup;
     149             :     Datum       values[Natts_pg_publication_rel];
     150             :     bool        nulls[Natts_pg_publication_rel];
     151         102 :     Oid         relid = RelationGetRelid(targetrel);
     152             :     Oid         prrelid;
     153         102 :     Publication *pub = GetPublication(pubid);
     154             :     ObjectAddress myself,
     155             :                 referenced;
     156             : 
     157         102 :     rel = table_open(PublicationRelRelationId, RowExclusiveLock);
     158             : 
     159             :     /*
     160             :      * Check for duplicates. Note that this does not really prevent
     161             :      * duplicates, it's here just to provide nicer error message in common
     162             :      * case. The real protection is the unique key on the catalog.
     163             :      */
     164         102 :     if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
     165             :                               ObjectIdGetDatum(pubid)))
     166             :     {
     167           8 :         table_close(rel, RowExclusiveLock);
     168             : 
     169           8 :         if (if_not_exists)
     170           4 :             return InvalidObjectAddress;
     171             : 
     172           4 :         ereport(ERROR,
     173             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     174             :                  errmsg("relation \"%s\" is already member of publication \"%s\"",
     175             :                         RelationGetRelationName(targetrel), pub->name)));
     176             :     }
     177             : 
     178          94 :     check_publication_add_relation(targetrel);
     179             : 
     180             :     /* Form a tuple. */
     181          86 :     memset(values, 0, sizeof(values));
     182          86 :     memset(nulls, false, sizeof(nulls));
     183             : 
     184          86 :     prrelid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
     185             :                                  Anum_pg_publication_rel_oid);
     186          86 :     values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(prrelid);
     187          86 :     values[Anum_pg_publication_rel_prpubid - 1] =
     188          86 :         ObjectIdGetDatum(pubid);
     189          86 :     values[Anum_pg_publication_rel_prrelid - 1] =
     190          86 :         ObjectIdGetDatum(relid);
     191             : 
     192          86 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     193             : 
     194             :     /* Insert tuple into catalog. */
     195          86 :     CatalogTupleInsert(rel, tup);
     196          86 :     heap_freetuple(tup);
     197             : 
     198          86 :     ObjectAddressSet(myself, PublicationRelRelationId, prrelid);
     199             : 
     200             :     /* Add dependency on the publication */
     201          86 :     ObjectAddressSet(referenced, PublicationRelationId, pubid);
     202          86 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     203             : 
     204             :     /* Add dependency on the relation */
     205          86 :     ObjectAddressSet(referenced, RelationRelationId, relid);
     206          86 :     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
     207             : 
     208             :     /* Close the table. */
     209          86 :     table_close(rel, RowExclusiveLock);
     210             : 
     211             :     /* Invalidate relcache so that publication info is rebuilt. */
     212          86 :     CacheInvalidateRelcache(targetrel);
     213             : 
     214          86 :     return myself;
     215             : }
     216             : 
     217             : /* Gets list of publication oids for a relation */
     218             : List *
     219        3526 : GetRelationPublications(Oid relid)
     220             : {
     221        3526 :     List       *result = NIL;
     222             :     CatCList   *pubrellist;
     223             :     int         i;
     224             : 
     225             :     /* Find all publications associated with the relation. */
     226        3526 :     pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
     227             :                                      ObjectIdGetDatum(relid));
     228        3582 :     for (i = 0; i < pubrellist->n_members; i++)
     229             :     {
     230          56 :         HeapTuple   tup = &pubrellist->members[i]->tuple;
     231          56 :         Oid         pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
     232             : 
     233          56 :         result = lappend_oid(result, pubid);
     234             :     }
     235             : 
     236        3526 :     ReleaseSysCacheList(pubrellist);
     237             : 
     238        3526 :     return result;
     239             : }
     240             : 
     241             : /*
     242             :  * Gets list of relation oids for a publication.
     243             :  *
     244             :  * This should only be used for normal publications, the FOR ALL TABLES
     245             :  * should use GetAllTablesPublicationRelations().
     246             :  */
     247             : List *
     248          42 : GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
     249             : {
     250             :     List       *result;
     251             :     Relation    pubrelsrel;
     252             :     ScanKeyData scankey;
     253             :     SysScanDesc scan;
     254             :     HeapTuple   tup;
     255             : 
     256             :     /* Find all publications associated with the relation. */
     257          42 :     pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
     258             : 
     259          42 :     ScanKeyInit(&scankey,
     260             :                 Anum_pg_publication_rel_prpubid,
     261             :                 BTEqualStrategyNumber, F_OIDEQ,
     262             :                 ObjectIdGetDatum(pubid));
     263             : 
     264          42 :     scan = systable_beginscan(pubrelsrel, PublicationRelPrrelidPrpubidIndexId,
     265             :                               true, NULL, 1, &scankey);
     266             : 
     267          42 :     result = NIL;
     268          92 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     269             :     {
     270             :         Form_pg_publication_rel pubrel;
     271             : 
     272          50 :         pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
     273             : 
     274          50 :         if (get_rel_relkind(pubrel->prrelid) == RELKIND_PARTITIONED_TABLE &&
     275             :             pub_partopt != PUBLICATION_PART_ROOT)
     276           6 :         {
     277           6 :             List       *all_parts = find_all_inheritors(pubrel->prrelid, NoLock,
     278             :                                                         NULL);
     279             : 
     280           6 :             if (pub_partopt == PUBLICATION_PART_ALL)
     281           4 :                 result = list_concat(result, all_parts);
     282           2 :             else if (pub_partopt == PUBLICATION_PART_LEAF)
     283             :             {
     284             :                 ListCell   *lc;
     285             : 
     286          10 :                 foreach(lc, all_parts)
     287             :                 {
     288           8 :                     Oid         partOid = lfirst_oid(lc);
     289             : 
     290           8 :                     if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
     291           6 :                         result = lappend_oid(result, partOid);
     292             :                 }
     293             :             }
     294             :             else
     295             :                 Assert(false);
     296             :         }
     297             :         else
     298          44 :             result = lappend_oid(result, pubrel->prrelid);
     299             :     }
     300             : 
     301          42 :     systable_endscan(scan);
     302          42 :     table_close(pubrelsrel, AccessShareLock);
     303             : 
     304          42 :     return result;
     305             : }
     306             : 
     307             : /*
     308             :  * Gets list of publication oids for publications marked as FOR ALL TABLES.
     309             :  */
     310             : List *
     311        2454 : GetAllTablesPublications(void)
     312             : {
     313             :     List       *result;
     314             :     Relation    rel;
     315             :     ScanKeyData scankey;
     316             :     SysScanDesc scan;
     317             :     HeapTuple   tup;
     318             : 
     319             :     /* Find all publications that are marked as for all tables. */
     320        2454 :     rel = table_open(PublicationRelationId, AccessShareLock);
     321             : 
     322        2454 :     ScanKeyInit(&scankey,
     323             :                 Anum_pg_publication_puballtables,
     324             :                 BTEqualStrategyNumber, F_BOOLEQ,
     325             :                 BoolGetDatum(true));
     326             : 
     327        2454 :     scan = systable_beginscan(rel, InvalidOid, false,
     328             :                               NULL, 1, &scankey);
     329             : 
     330        2454 :     result = NIL;
     331        2454 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     332             :     {
     333           0 :         Oid         oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
     334             : 
     335           0 :         result = lappend_oid(result, oid);
     336             :     }
     337             : 
     338        2454 :     systable_endscan(scan);
     339        2454 :     table_close(rel, AccessShareLock);
     340             : 
     341        2454 :     return result;
     342             : }
     343             : 
     344             : /*
     345             :  * Gets list of all relation published by FOR ALL TABLES publication(s).
     346             :  *
     347             :  * If the publication publishes partition changes via their respective root
     348             :  * partitioned tables, we must exclude partitions in favor of including the
     349             :  * root partitioned tables.
     350             :  */
     351             : List *
     352          34 : GetAllTablesPublicationRelations(bool pubviaroot)
     353             : {
     354             :     Relation    classRel;
     355             :     ScanKeyData key[1];
     356             :     TableScanDesc scan;
     357             :     HeapTuple   tuple;
     358          34 :     List       *result = NIL;
     359             : 
     360          34 :     classRel = table_open(RelationRelationId, AccessShareLock);
     361             : 
     362          34 :     ScanKeyInit(&key[0],
     363             :                 Anum_pg_class_relkind,
     364             :                 BTEqualStrategyNumber, F_CHAREQ,
     365             :                 CharGetDatum(RELKIND_RELATION));
     366             : 
     367          34 :     scan = table_beginscan_catalog(classRel, 1, key);
     368             : 
     369        2358 :     while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
     370             :     {
     371        2324 :         Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
     372        2324 :         Oid         relid = relForm->oid;
     373             : 
     374        2324 :         if (is_publishable_class(relid, relForm) &&
     375          80 :             !(relForm->relispartition && pubviaroot))
     376          68 :             result = lappend_oid(result, relid);
     377             :     }
     378             : 
     379          34 :     table_endscan(scan);
     380             : 
     381          34 :     if (pubviaroot)
     382             :     {
     383           2 :         ScanKeyInit(&key[0],
     384             :                     Anum_pg_class_relkind,
     385             :                     BTEqualStrategyNumber, F_CHAREQ,
     386             :                     CharGetDatum(RELKIND_PARTITIONED_TABLE));
     387             : 
     388           2 :         scan = table_beginscan_catalog(classRel, 1, key);
     389             : 
     390           8 :         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
     391             :         {
     392           6 :             Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
     393           6 :             Oid         relid = relForm->oid;
     394             : 
     395           6 :             if (is_publishable_class(relid, relForm) &&
     396           6 :                 !relForm->relispartition)
     397           6 :                 result = lappend_oid(result, relid);
     398             :         }
     399             : 
     400           2 :         table_endscan(scan);
     401             :     }
     402             : 
     403          34 :     table_close(classRel, AccessShareLock);
     404          34 :     return result;
     405             : }
     406             : 
     407             : /*
     408             :  * Get publication using oid
     409             :  *
     410             :  * The Publication struct and its data are palloc'ed here.
     411             :  */
     412             : Publication *
     413         196 : GetPublication(Oid pubid)
     414             : {
     415             :     HeapTuple   tup;
     416             :     Publication *pub;
     417             :     Form_pg_publication pubform;
     418             : 
     419         196 :     tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
     420         196 :     if (!HeapTupleIsValid(tup))
     421           0 :         elog(ERROR, "cache lookup failed for publication %u", pubid);
     422             : 
     423         196 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
     424             : 
     425         196 :     pub = (Publication *) palloc(sizeof(Publication));
     426         196 :     pub->oid = pubid;
     427         196 :     pub->name = pstrdup(NameStr(pubform->pubname));
     428         196 :     pub->alltables = pubform->puballtables;
     429         196 :     pub->pubactions.pubinsert = pubform->pubinsert;
     430         196 :     pub->pubactions.pubupdate = pubform->pubupdate;
     431         196 :     pub->pubactions.pubdelete = pubform->pubdelete;
     432         196 :     pub->pubactions.pubtruncate = pubform->pubtruncate;
     433         196 :     pub->pubviaroot = pubform->pubviaroot;
     434             : 
     435         196 :     ReleaseSysCache(tup);
     436             : 
     437         196 :     return pub;
     438             : }
     439             : 
     440             : 
     441             : /*
     442             :  * Get Publication using name.
     443             :  */
     444             : Publication *
     445          94 : GetPublicationByName(const char *pubname, bool missing_ok)
     446             : {
     447             :     Oid         oid;
     448             : 
     449          94 :     oid = get_publication_oid(pubname, missing_ok);
     450             : 
     451          94 :     return OidIsValid(oid) ? GetPublication(oid) : NULL;
     452             : }
     453             : 
     454             : /*
     455             :  * get_publication_oid - given a publication name, look up the OID
     456             :  *
     457             :  * If missing_ok is false, throw an error if name not found.  If true, just
     458             :  * return InvalidOid.
     459             :  */
     460             : Oid
     461         174 : get_publication_oid(const char *pubname, bool missing_ok)
     462             : {
     463             :     Oid         oid;
     464             : 
     465         174 :     oid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
     466             :                           CStringGetDatum(pubname));
     467         174 :     if (!OidIsValid(oid) && !missing_ok)
     468           4 :         ereport(ERROR,
     469             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     470             :                  errmsg("publication \"%s\" does not exist", pubname)));
     471         170 :     return oid;
     472             : }
     473             : 
     474             : /*
     475             :  * get_publication_name - given a publication Oid, look up the name
     476             :  *
     477             :  * If missing_ok is false, throw an error if name not found.  If true, just
     478             :  * return NULL.
     479             :  */
     480             : char *
     481         102 : get_publication_name(Oid pubid, bool missing_ok)
     482             : {
     483             :     HeapTuple   tup;
     484             :     char       *pubname;
     485             :     Form_pg_publication pubform;
     486             : 
     487         102 :     tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
     488             : 
     489         102 :     if (!HeapTupleIsValid(tup))
     490             :     {
     491           0 :         if (!missing_ok)
     492           0 :             elog(ERROR, "cache lookup failed for publication %u", pubid);
     493           0 :         return NULL;
     494             :     }
     495             : 
     496         102 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
     497         102 :     pubname = pstrdup(NameStr(pubform->pubname));
     498             : 
     499         102 :     ReleaseSysCache(tup);
     500             : 
     501         102 :     return pubname;
     502             : }
     503             : 
     504             : /*
     505             :  * Returns Oids of tables in a publication.
     506             :  */
     507             : Datum
     508         168 : pg_get_publication_tables(PG_FUNCTION_ARGS)
     509             : {
     510             :     FuncCallContext *funcctx;
     511         168 :     char       *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     512             :     Publication *publication;
     513             :     List       *tables;
     514             : 
     515             :     /* stuff done only on the first call of the function */
     516         168 :     if (SRF_IS_FIRSTCALL())
     517             :     {
     518             :         MemoryContext oldcontext;
     519             : 
     520             :         /* create a function context for cross-call persistence */
     521          52 :         funcctx = SRF_FIRSTCALL_INIT();
     522             : 
     523             :         /* switch to memory context appropriate for multiple function calls */
     524          52 :         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
     525             : 
     526          52 :         publication = GetPublicationByName(pubname, false);
     527             : 
     528             :         /*
     529             :          * Publications support partitioned tables, although all changes are
     530             :          * replicated using leaf partition identity and schema, so we only
     531             :          * need those.
     532             :          */
     533          52 :         if (publication->alltables)
     534          34 :             tables = GetAllTablesPublicationRelations(publication->pubviaroot);
     535             :         else
     536          18 :             tables = GetPublicationRelations(publication->oid,
     537          18 :                                              publication->pubviaroot ?
     538          18 :                                              PUBLICATION_PART_ROOT :
     539             :                                              PUBLICATION_PART_LEAF);
     540          52 :         funcctx->user_fctx = (void *) tables;
     541             : 
     542          52 :         MemoryContextSwitchTo(oldcontext);
     543             :     }
     544             : 
     545             :     /* stuff done on every call of the function */
     546         168 :     funcctx = SRF_PERCALL_SETUP();
     547         168 :     tables = (List *) funcctx->user_fctx;
     548             : 
     549         168 :     if (funcctx->call_cntr < list_length(tables))
     550             :     {
     551         116 :         Oid         relid = list_nth_oid(tables, funcctx->call_cntr);
     552             : 
     553         116 :         SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
     554             :     }
     555             : 
     556          52 :     SRF_RETURN_DONE(funcctx);
     557             : }

Generated by: LCOV version 1.13