LCOV - code coverage report
Current view: top level - src/backend/commands - publicationcmds.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 241 276 87.3 %
Date: 2020-06-01 09:07:10 Functions: 13 14 92.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * publicationcmds.c
       4             :  *      publication manipulation
       5             :  *
       6             :  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *      publicationcmds.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/genam.h"
      18             : #include "access/htup_details.h"
      19             : #include "access/table.h"
      20             : #include "access/xact.h"
      21             : #include "catalog/catalog.h"
      22             : #include "catalog/indexing.h"
      23             : #include "catalog/namespace.h"
      24             : #include "catalog/objectaccess.h"
      25             : #include "catalog/objectaddress.h"
      26             : #include "catalog/partition.h"
      27             : #include "catalog/pg_inherits.h"
      28             : #include "catalog/pg_publication.h"
      29             : #include "catalog/pg_publication_rel.h"
      30             : #include "catalog/pg_type.h"
      31             : #include "commands/dbcommands.h"
      32             : #include "commands/defrem.h"
      33             : #include "commands/event_trigger.h"
      34             : #include "commands/publicationcmds.h"
      35             : #include "funcapi.h"
      36             : #include "miscadmin.h"
      37             : #include "utils/acl.h"
      38             : #include "utils/array.h"
      39             : #include "utils/builtins.h"
      40             : #include "utils/catcache.h"
      41             : #include "utils/fmgroids.h"
      42             : #include "utils/inval.h"
      43             : #include "utils/lsyscache.h"
      44             : #include "utils/rel.h"
      45             : #include "utils/syscache.h"
      46             : #include "utils/varlena.h"
      47             : 
      48             : /* Same as MAXNUMMESSAGES in sinvaladt.c */
      49             : #define MAX_RELCACHE_INVAL_MSGS 4096
      50             : 
      51             : static List *OpenTableList(List *tables);
      52             : static void CloseTableList(List *rels);
      53             : static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
      54             :                                  AlterPublicationStmt *stmt);
      55             : static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
      56             : 
      57             : static void
      58         126 : parse_publication_options(List *options,
      59             :                           bool *publish_given,
      60             :                           PublicationActions *pubactions,
      61             :                           bool *publish_via_partition_root_given,
      62             :                           bool *publish_via_partition_root)
      63             : {
      64             :     ListCell   *lc;
      65             : 
      66         126 :     *publish_given = false;
      67         126 :     *publish_via_partition_root_given = false;
      68             : 
      69             :     /* defaults */
      70         126 :     pubactions->pubinsert = true;
      71         126 :     pubactions->pubupdate = true;
      72         126 :     pubactions->pubdelete = true;
      73         126 :     pubactions->pubtruncate = true;
      74         126 :     *publish_via_partition_root = false;
      75             : 
      76             :     /* Parse options */
      77         172 :     foreach(lc, options)
      78             :     {
      79          58 :         DefElem    *defel = (DefElem *) lfirst(lc);
      80             : 
      81          58 :         if (strcmp(defel->defname, "publish") == 0)
      82             :         {
      83             :             char       *publish;
      84             :             List       *publish_list;
      85             :             ListCell   *lc;
      86             : 
      87          38 :             if (*publish_given)
      88           0 :                 ereport(ERROR,
      89             :                         (errcode(ERRCODE_SYNTAX_ERROR),
      90             :                          errmsg("conflicting or redundant options")));
      91             : 
      92             :             /*
      93             :              * If publish option was given only the explicitly listed actions
      94             :              * should be published.
      95             :              */
      96          38 :             pubactions->pubinsert = false;
      97          38 :             pubactions->pubupdate = false;
      98          38 :             pubactions->pubdelete = false;
      99          38 :             pubactions->pubtruncate = false;
     100             : 
     101          38 :             *publish_given = true;
     102          38 :             publish = defGetString(defel);
     103             : 
     104          38 :             if (!SplitIdentifierString(publish, ',', &publish_list))
     105           0 :                 ereport(ERROR,
     106             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     107             :                          errmsg("invalid list syntax for \"publish\" option")));
     108             : 
     109             :             /* Process the option list. */
     110          86 :             foreach(lc, publish_list)
     111             :             {
     112          52 :                 char       *publish_opt = (char *) lfirst(lc);
     113             : 
     114          52 :                 if (strcmp(publish_opt, "insert") == 0)
     115          28 :                     pubactions->pubinsert = true;
     116          24 :                 else if (strcmp(publish_opt, "update") == 0)
     117          12 :                     pubactions->pubupdate = true;
     118          12 :                 else if (strcmp(publish_opt, "delete") == 0)
     119           6 :                     pubactions->pubdelete = true;
     120           6 :                 else if (strcmp(publish_opt, "truncate") == 0)
     121           2 :                     pubactions->pubtruncate = true;
     122             :                 else
     123           4 :                     ereport(ERROR,
     124             :                             (errcode(ERRCODE_SYNTAX_ERROR),
     125             :                              errmsg("unrecognized \"publish\" value: \"%s\"", publish_opt)));
     126             :             }
     127             :         }
     128          20 :         else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
     129             :         {
     130          16 :             if (*publish_via_partition_root_given)
     131           4 :                 ereport(ERROR,
     132             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     133             :                          errmsg("conflicting or redundant options")));
     134          12 :             *publish_via_partition_root_given = true;
     135          12 :             *publish_via_partition_root = defGetBoolean(defel);
     136             :         }
     137             :         else
     138           4 :             ereport(ERROR,
     139             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     140             :                      errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
     141             :     }
     142         114 : }
     143             : 
     144             : /*
     145             :  * Create new publication.
     146             :  */
     147             : ObjectAddress
     148         108 : CreatePublication(CreatePublicationStmt *stmt)
     149             : {
     150             :     Relation    rel;
     151             :     ObjectAddress myself;
     152             :     Oid         puboid;
     153             :     bool        nulls[Natts_pg_publication];
     154             :     Datum       values[Natts_pg_publication];
     155             :     HeapTuple   tup;
     156             :     bool        publish_given;
     157             :     PublicationActions pubactions;
     158             :     bool        publish_via_partition_root_given;
     159             :     bool        publish_via_partition_root;
     160             :     AclResult   aclresult;
     161             : 
     162             :     /* must have CREATE privilege on database */
     163         108 :     aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE);
     164         108 :     if (aclresult != ACLCHECK_OK)
     165           4 :         aclcheck_error(aclresult, OBJECT_DATABASE,
     166           4 :                        get_database_name(MyDatabaseId));
     167             : 
     168             :     /* FOR ALL TABLES requires superuser */
     169         104 :     if (stmt->for_all_tables && !superuser())
     170           0 :         ereport(ERROR,
     171             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     172             :                  errmsg("must be superuser to create FOR ALL TABLES publication")));
     173             : 
     174         104 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
     175             : 
     176             :     /* Check if name is used */
     177         104 :     puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
     178             :                              CStringGetDatum(stmt->pubname));
     179         104 :     if (OidIsValid(puboid))
     180             :     {
     181           4 :         ereport(ERROR,
     182             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     183             :                  errmsg("publication \"%s\" already exists",
     184             :                         stmt->pubname)));
     185             :     }
     186             : 
     187             :     /* Form a tuple. */
     188         100 :     memset(values, 0, sizeof(values));
     189         100 :     memset(nulls, false, sizeof(nulls));
     190             : 
     191         100 :     values[Anum_pg_publication_pubname - 1] =
     192         100 :         DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
     193         100 :     values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
     194             : 
     195         100 :     parse_publication_options(stmt->options,
     196             :                               &publish_given, &pubactions,
     197             :                               &publish_via_partition_root_given,
     198             :                               &publish_via_partition_root);
     199             : 
     200          88 :     puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
     201             :                                 Anum_pg_publication_oid);
     202          88 :     values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
     203          88 :     values[Anum_pg_publication_puballtables - 1] =
     204          88 :         BoolGetDatum(stmt->for_all_tables);
     205          88 :     values[Anum_pg_publication_pubinsert - 1] =
     206          88 :         BoolGetDatum(pubactions.pubinsert);
     207          88 :     values[Anum_pg_publication_pubupdate - 1] =
     208          88 :         BoolGetDatum(pubactions.pubupdate);
     209          88 :     values[Anum_pg_publication_pubdelete - 1] =
     210          88 :         BoolGetDatum(pubactions.pubdelete);
     211          88 :     values[Anum_pg_publication_pubtruncate - 1] =
     212          88 :         BoolGetDatum(pubactions.pubtruncate);
     213          88 :     values[Anum_pg_publication_pubviaroot - 1] =
     214          88 :         BoolGetDatum(publish_via_partition_root);
     215             : 
     216          88 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     217             : 
     218             :     /* Insert tuple into catalog. */
     219          88 :     CatalogTupleInsert(rel, tup);
     220          88 :     heap_freetuple(tup);
     221             : 
     222          88 :     recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
     223             : 
     224          88 :     ObjectAddressSet(myself, PublicationRelationId, puboid);
     225             : 
     226             :     /* Make the changes visible. */
     227          88 :     CommandCounterIncrement();
     228             : 
     229          88 :     if (stmt->tables)
     230             :     {
     231             :         List       *rels;
     232             : 
     233             :         Assert(list_length(stmt->tables) > 0);
     234             : 
     235          28 :         rels = OpenTableList(stmt->tables);
     236          28 :         PublicationAddTables(puboid, rels, true, NULL);
     237          24 :         CloseTableList(rels);
     238             :     }
     239             : 
     240          84 :     table_close(rel, RowExclusiveLock);
     241             : 
     242          84 :     InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
     243             : 
     244          84 :     if (wal_level != WAL_LEVEL_LOGICAL)
     245             :     {
     246          46 :         ereport(WARNING,
     247             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     248             :                  errmsg("wal_level is insufficient to publish logical changes"),
     249             :                  errhint("Set wal_level to logical before creating subscriptions.")));
     250             :     }
     251             : 
     252          84 :     return myself;
     253             : }
     254             : 
     255             : /*
     256             :  * Change options of a publication.
     257             :  */
     258             : static void
     259          26 : AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
     260             :                         HeapTuple tup)
     261             : {
     262             :     bool        nulls[Natts_pg_publication];
     263             :     bool        replaces[Natts_pg_publication];
     264             :     Datum       values[Natts_pg_publication];
     265             :     bool        publish_given;
     266             :     PublicationActions pubactions;
     267             :     bool        publish_via_partition_root_given;
     268             :     bool        publish_via_partition_root;
     269             :     ObjectAddress obj;
     270             :     Form_pg_publication pubform;
     271             : 
     272          26 :     parse_publication_options(stmt->options,
     273             :                               &publish_given, &pubactions,
     274             :                               &publish_via_partition_root_given,
     275             :                               &publish_via_partition_root);
     276             : 
     277             :     /* Everything ok, form a new tuple. */
     278          26 :     memset(values, 0, sizeof(values));
     279          26 :     memset(nulls, false, sizeof(nulls));
     280          26 :     memset(replaces, false, sizeof(replaces));
     281             : 
     282          26 :     if (publish_given)
     283             :     {
     284          20 :         values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
     285          20 :         replaces[Anum_pg_publication_pubinsert - 1] = true;
     286             : 
     287          20 :         values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
     288          20 :         replaces[Anum_pg_publication_pubupdate - 1] = true;
     289             : 
     290          20 :         values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
     291          20 :         replaces[Anum_pg_publication_pubdelete - 1] = true;
     292             : 
     293          20 :         values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
     294          20 :         replaces[Anum_pg_publication_pubtruncate - 1] = true;
     295             :     }
     296             : 
     297          26 :     if (publish_via_partition_root_given)
     298             :     {
     299           6 :         values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
     300           6 :         replaces[Anum_pg_publication_pubviaroot - 1] = true;
     301             :     }
     302             : 
     303          26 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
     304             :                             replaces);
     305             : 
     306             :     /* Update the catalog. */
     307          26 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     308             : 
     309          26 :     CommandCounterIncrement();
     310             : 
     311          26 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
     312             : 
     313             :     /* Invalidate the relcache. */
     314          26 :     if (pubform->puballtables)
     315             :     {
     316           6 :         CacheInvalidateRelcacheAll();
     317             :     }
     318             :     else
     319             :     {
     320             :         /*
     321             :          * For any partitioned tables contained in the publication, we must
     322             :          * invalidate all partitions contained in the respective partition
     323             :          * trees, not just those explicitly mentioned in the publication.
     324             :          */
     325          20 :         List       *relids = GetPublicationRelations(pubform->oid,
     326             :                                                      PUBLICATION_PART_ALL);
     327             : 
     328             :         /*
     329             :          * We don't want to send too many individual messages, at some point
     330             :          * it's cheaper to just reset whole relcache.
     331             :          */
     332          20 :         if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
     333             :         {
     334             :             ListCell   *lc;
     335             : 
     336          28 :             foreach(lc, relids)
     337             :             {
     338           8 :                 Oid         relid = lfirst_oid(lc);
     339             : 
     340           8 :                 CacheInvalidateRelcacheByRelid(relid);
     341             :             }
     342             :         }
     343             :         else
     344           0 :             CacheInvalidateRelcacheAll();
     345             :     }
     346             : 
     347          26 :     ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
     348          26 :     EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
     349             :                                      (Node *) stmt);
     350             : 
     351          26 :     InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
     352          26 : }
     353             : 
     354             : /*
     355             :  * Add or remove table to/from publication.
     356             :  */
     357             : static void
     358          68 : AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
     359             :                        HeapTuple tup)
     360             : {
     361          68 :     List       *rels = NIL;
     362          68 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
     363          68 :     Oid         pubid = pubform->oid;
     364             : 
     365             :     /* Check that user is allowed to manipulate the publication tables. */
     366          68 :     if (pubform->puballtables)
     367          12 :         ereport(ERROR,
     368             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     369             :                  errmsg("publication \"%s\" is defined as FOR ALL TABLES",
     370             :                         NameStr(pubform->pubname)),
     371             :                  errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
     372             : 
     373             :     Assert(list_length(stmt->tables) > 0);
     374             : 
     375          56 :     rels = OpenTableList(stmt->tables);
     376             : 
     377          56 :     if (stmt->tableAction == DEFELEM_ADD)
     378          44 :         PublicationAddTables(pubid, rels, false, stmt);
     379          12 :     else if (stmt->tableAction == DEFELEM_DROP)
     380           8 :         PublicationDropTables(pubid, rels, false);
     381             :     else                        /* DEFELEM_SET */
     382             :     {
     383           4 :         List       *oldrelids = GetPublicationRelations(pubid,
     384             :                                                         PUBLICATION_PART_ROOT);
     385           4 :         List       *delrels = NIL;
     386             :         ListCell   *oldlc;
     387             : 
     388             :         /* Calculate which relations to drop. */
     389           8 :         foreach(oldlc, oldrelids)
     390             :         {
     391           4 :             Oid         oldrelid = lfirst_oid(oldlc);
     392             :             ListCell   *newlc;
     393           4 :             bool        found = false;
     394             : 
     395           4 :             foreach(newlc, rels)
     396             :             {
     397           4 :                 Relation    newrel = (Relation) lfirst(newlc);
     398             : 
     399           4 :                 if (RelationGetRelid(newrel) == oldrelid)
     400             :                 {
     401           4 :                     found = true;
     402           4 :                     break;
     403             :                 }
     404             :             }
     405             : 
     406           4 :             if (!found)
     407             :             {
     408           0 :                 Relation    oldrel = table_open(oldrelid,
     409             :                                                 ShareUpdateExclusiveLock);
     410             : 
     411           0 :                 delrels = lappend(delrels, oldrel);
     412             :             }
     413             :         }
     414             : 
     415             :         /* And drop them. */
     416           4 :         PublicationDropTables(pubid, delrels, true);
     417             : 
     418             :         /*
     419             :          * Don't bother calculating the difference for adding, we'll catch and
     420             :          * skip existing ones when doing catalog update.
     421             :          */
     422           4 :         PublicationAddTables(pubid, rels, true, stmt);
     423             : 
     424           4 :         CloseTableList(delrels);
     425             :     }
     426             : 
     427          40 :     CloseTableList(rels);
     428          40 : }
     429             : 
     430             : /*
     431             :  * Alter the existing publication.
     432             :  *
     433             :  * This is dispatcher function for AlterPublicationOptions and
     434             :  * AlterPublicationTables.
     435             :  */
     436             : void
     437          94 : AlterPublication(AlterPublicationStmt *stmt)
     438             : {
     439             :     Relation    rel;
     440             :     HeapTuple   tup;
     441             :     Form_pg_publication pubform;
     442             : 
     443          94 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
     444             : 
     445          94 :     tup = SearchSysCacheCopy1(PUBLICATIONNAME,
     446             :                               CStringGetDatum(stmt->pubname));
     447             : 
     448          94 :     if (!HeapTupleIsValid(tup))
     449           0 :         ereport(ERROR,
     450             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     451             :                  errmsg("publication \"%s\" does not exist",
     452             :                         stmt->pubname)));
     453             : 
     454          94 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
     455             : 
     456             :     /* must be owner */
     457          94 :     if (!pg_publication_ownercheck(pubform->oid, GetUserId()))
     458           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
     459           0 :                        stmt->pubname);
     460             : 
     461          94 :     if (stmt->options)
     462          26 :         AlterPublicationOptions(stmt, rel, tup);
     463             :     else
     464          68 :         AlterPublicationTables(stmt, rel, tup);
     465             : 
     466             :     /* Cleanup. */
     467          66 :     heap_freetuple(tup);
     468          66 :     table_close(rel, RowExclusiveLock);
     469          66 : }
     470             : 
     471             : /*
     472             :  * Drop publication by OID
     473             :  */
     474             : void
     475          46 : RemovePublicationById(Oid pubid)
     476             : {
     477             :     Relation    rel;
     478             :     HeapTuple   tup;
     479             : 
     480          46 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
     481             : 
     482          46 :     tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
     483             : 
     484          46 :     if (!HeapTupleIsValid(tup))
     485           0 :         elog(ERROR, "cache lookup failed for publication %u", pubid);
     486             : 
     487          46 :     CatalogTupleDelete(rel, &tup->t_self);
     488             : 
     489          46 :     ReleaseSysCache(tup);
     490             : 
     491          46 :     table_close(rel, RowExclusiveLock);
     492          46 : }
     493             : 
     494             : /*
     495             :  * Remove relation from publication by mapping OID.
     496             :  */
     497             : void
     498          54 : RemovePublicationRelById(Oid proid)
     499             : {
     500             :     Relation    rel;
     501             :     HeapTuple   tup;
     502             :     Form_pg_publication_rel pubrel;
     503             : 
     504          54 :     rel = table_open(PublicationRelRelationId, RowExclusiveLock);
     505             : 
     506          54 :     tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
     507             : 
     508          54 :     if (!HeapTupleIsValid(tup))
     509           0 :         elog(ERROR, "cache lookup failed for publication table %u",
     510             :              proid);
     511             : 
     512          54 :     pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
     513             : 
     514             :     /* Invalidate relcache so that publication info is rebuilt. */
     515          54 :     CacheInvalidateRelcacheByRelid(pubrel->prrelid);
     516             : 
     517          54 :     CatalogTupleDelete(rel, &tup->t_self);
     518             : 
     519          54 :     ReleaseSysCache(tup);
     520             : 
     521          54 :     table_close(rel, RowExclusiveLock);
     522          54 : }
     523             : 
     524             : /*
     525             :  * Open relations specified by a RangeVar list.
     526             :  * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
     527             :  * add them to a publication.
     528             :  */
     529             : static List *
     530          84 : OpenTableList(List *tables)
     531             : {
     532          84 :     List       *relids = NIL;
     533          84 :     List       *rels = NIL;
     534             :     ListCell   *lc;
     535             : 
     536             :     /*
     537             :      * Open, share-lock, and check all the explicitly-specified relations
     538             :      */
     539         198 :     foreach(lc, tables)
     540             :     {
     541         114 :         RangeVar   *rv = castNode(RangeVar, lfirst(lc));
     542         114 :         bool        recurse = rv->inh;
     543             :         Relation    rel;
     544             :         Oid         myrelid;
     545             : 
     546             :         /* Allow query cancel in case this takes a long time */
     547         114 :         CHECK_FOR_INTERRUPTS();
     548             : 
     549         114 :         rel = table_openrv(rv, ShareUpdateExclusiveLock);
     550         114 :         myrelid = RelationGetRelid(rel);
     551             : 
     552             :         /*
     553             :          * Filter out duplicates if user specifies "foo, foo".
     554             :          *
     555             :          * Note that this algorithm is known to not be very efficient (O(N^2))
     556             :          * but given that it only works on list of tables given to us by user
     557             :          * it's deemed acceptable.
     558             :          */
     559         114 :         if (list_member_oid(relids, myrelid))
     560             :         {
     561           0 :             table_close(rel, ShareUpdateExclusiveLock);
     562           0 :             continue;
     563             :         }
     564             : 
     565         114 :         rels = lappend(rels, rel);
     566         114 :         relids = lappend_oid(relids, myrelid);
     567             : 
     568             :         /*
     569             :          * Add children of this rel, if requested, so that they too are added
     570             :          * to the publication.  A partitioned table can't have any inheritance
     571             :          * children other than its partitions, which need not be explicitly
     572             :          * added to the publication.
     573             :          */
     574         114 :         if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
     575             :         {
     576             :             List       *children;
     577             :             ListCell   *child;
     578             : 
     579         102 :             children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
     580             :                                            NULL);
     581             : 
     582         208 :             foreach(child, children)
     583             :             {
     584         106 :                 Oid         childrelid = lfirst_oid(child);
     585             : 
     586             :                 /* Allow query cancel in case this takes a long time */
     587         106 :                 CHECK_FOR_INTERRUPTS();
     588             : 
     589             :                 /*
     590             :                  * Skip duplicates if user specified both parent and child
     591             :                  * tables.
     592             :                  */
     593         106 :                 if (list_member_oid(relids, childrelid))
     594         102 :                     continue;
     595             : 
     596             :                 /* find_all_inheritors already got lock */
     597           4 :                 rel = table_open(childrelid, NoLock);
     598           4 :                 rels = lappend(rels, rel);
     599           4 :                 relids = lappend_oid(relids, childrelid);
     600             :             }
     601             :         }
     602             :     }
     603             : 
     604          84 :     list_free(relids);
     605             : 
     606          84 :     return rels;
     607             : }
     608             : 
     609             : /*
     610             :  * Close all relations in the list.
     611             :  */
     612             : static void
     613          68 : CloseTableList(List *rels)
     614             : {
     615             :     ListCell   *lc;
     616             : 
     617         166 :     foreach(lc, rels)
     618             :     {
     619          98 :         Relation    rel = (Relation) lfirst(lc);
     620             : 
     621          98 :         table_close(rel, NoLock);
     622             :     }
     623          68 : }
     624             : 
     625             : /*
     626             :  * Add listed tables to the publication.
     627             :  */
     628             : static void
     629          76 : PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
     630             :                      AlterPublicationStmt *stmt)
     631             : {
     632             :     ListCell   *lc;
     633             : 
     634             :     Assert(!stmt || !stmt->for_all_tables);
     635             : 
     636         166 :     foreach(lc, rels)
     637             :     {
     638         106 :         Relation    rel = (Relation) lfirst(lc);
     639             :         ObjectAddress obj;
     640             : 
     641             :         /* Must be owner of the table or superuser. */
     642         106 :         if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
     643           4 :             aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
     644           4 :                            RelationGetRelationName(rel));
     645             : 
     646         102 :         obj = publication_add_relation(pubid, rel, if_not_exists);
     647          90 :         if (stmt)
     648             :         {
     649          54 :             EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
     650             :                                              (Node *) stmt);
     651             : 
     652          54 :             InvokeObjectPostCreateHook(PublicationRelRelationId,
     653             :                                        obj.objectId, 0);
     654             :         }
     655             :     }
     656          60 : }
     657             : 
     658             : /*
     659             :  * Remove listed tables from the publication.
     660             :  */
     661             : static void
     662          12 : PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
     663             : {
     664             :     ObjectAddress obj;
     665             :     ListCell   *lc;
     666             :     Oid         prid;
     667             : 
     668          20 :     foreach(lc, rels)
     669             :     {
     670          12 :         Relation    rel = (Relation) lfirst(lc);
     671          12 :         Oid         relid = RelationGetRelid(rel);
     672             : 
     673          12 :         prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
     674             :                                ObjectIdGetDatum(relid),
     675             :                                ObjectIdGetDatum(pubid));
     676          12 :         if (!OidIsValid(prid))
     677             :         {
     678           4 :             if (missing_ok)
     679           0 :                 continue;
     680             : 
     681           4 :             ereport(ERROR,
     682             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
     683             :                      errmsg("relation \"%s\" is not part of the publication",
     684             :                             RelationGetRelationName(rel))));
     685             :         }
     686             : 
     687           8 :         ObjectAddressSet(obj, PublicationRelRelationId, prid);
     688           8 :         performDeletion(&obj, DROP_CASCADE, 0);
     689             :     }
     690           8 : }
     691             : 
     692             : /*
     693             :  * Internal workhorse for changing a publication owner
     694             :  */
     695             : static void
     696           4 : AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
     697             : {
     698             :     Form_pg_publication form;
     699             : 
     700           4 :     form = (Form_pg_publication) GETSTRUCT(tup);
     701             : 
     702           4 :     if (form->pubowner == newOwnerId)
     703           0 :         return;
     704             : 
     705           4 :     if (!superuser())
     706             :     {
     707             :         AclResult   aclresult;
     708             : 
     709             :         /* Must be owner */
     710           0 :         if (!pg_publication_ownercheck(form->oid, GetUserId()))
     711           0 :             aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
     712           0 :                            NameStr(form->pubname));
     713             : 
     714             :         /* Must be able to become new owner */
     715           0 :         check_is_member_of_role(GetUserId(), newOwnerId);
     716             : 
     717             :         /* New owner must have CREATE privilege on database */
     718           0 :         aclresult = pg_database_aclcheck(MyDatabaseId, newOwnerId, ACL_CREATE);
     719           0 :         if (aclresult != ACLCHECK_OK)
     720           0 :             aclcheck_error(aclresult, OBJECT_DATABASE,
     721           0 :                            get_database_name(MyDatabaseId));
     722             : 
     723           0 :         if (form->puballtables && !superuser_arg(newOwnerId))
     724           0 :             ereport(ERROR,
     725             :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     726             :                      errmsg("permission denied to change owner of publication \"%s\"",
     727             :                             NameStr(form->pubname)),
     728             :                      errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
     729             :     }
     730             : 
     731           4 :     form->pubowner = newOwnerId;
     732           4 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
     733             : 
     734             :     /* Update owner dependency reference */
     735           4 :     changeDependencyOnOwner(PublicationRelationId,
     736             :                             form->oid,
     737             :                             newOwnerId);
     738             : 
     739           4 :     InvokeObjectPostAlterHook(PublicationRelationId,
     740             :                               form->oid, 0);
     741             : }
     742             : 
     743             : /*
     744             :  * Change publication owner -- by name
     745             :  */
     746             : ObjectAddress
     747           4 : AlterPublicationOwner(const char *name, Oid newOwnerId)
     748             : {
     749             :     Oid         subid;
     750             :     HeapTuple   tup;
     751             :     Relation    rel;
     752             :     ObjectAddress address;
     753             :     Form_pg_publication pubform;
     754             : 
     755           4 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
     756             : 
     757           4 :     tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
     758             : 
     759           4 :     if (!HeapTupleIsValid(tup))
     760           0 :         ereport(ERROR,
     761             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     762             :                  errmsg("publication \"%s\" does not exist", name)));
     763             : 
     764           4 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
     765           4 :     subid = pubform->oid;
     766             : 
     767           4 :     AlterPublicationOwner_internal(rel, tup, newOwnerId);
     768             : 
     769           4 :     ObjectAddressSet(address, PublicationRelationId, subid);
     770             : 
     771           4 :     heap_freetuple(tup);
     772             : 
     773           4 :     table_close(rel, RowExclusiveLock);
     774             : 
     775           4 :     return address;
     776             : }
     777             : 
     778             : /*
     779             :  * Change publication owner -- by OID
     780             :  */
     781             : void
     782           0 : AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
     783             : {
     784             :     HeapTuple   tup;
     785             :     Relation    rel;
     786             : 
     787           0 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
     788             : 
     789           0 :     tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(subid));
     790             : 
     791           0 :     if (!HeapTupleIsValid(tup))
     792           0 :         ereport(ERROR,
     793             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     794             :                  errmsg("publication with OID %u does not exist", subid)));
     795             : 
     796           0 :     AlterPublicationOwner_internal(rel, tup, newOwnerId);
     797             : 
     798           0 :     heap_freetuple(tup);
     799             : 
     800           0 :     table_close(rel, RowExclusiveLock);
     801           0 : }

Generated by: LCOV version 1.13