LCOV - code coverage report
Current view: top level - src/backend/commands - publicationcmds.c (source / functions) Hit Total Coverage
Test: PostgreSQL 16beta1 Lines: 600 648 92.6 %
Date: 2023-05-30 17:15:13 Functions: 29 30 96.7 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * publicationcmds.c
       4             :  *      publication manipulation
       5             :  *
       6             :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *      src/backend/commands/publicationcmds.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/genam.h"
      18             : #include "access/htup_details.h"
      19             : #include "access/table.h"
      20             : #include "access/xact.h"
      21             : #include "catalog/catalog.h"
      22             : #include "catalog/indexing.h"
      23             : #include "catalog/namespace.h"
      24             : #include "catalog/objectaccess.h"
      25             : #include "catalog/objectaddress.h"
      26             : #include "catalog/partition.h"
      27             : #include "catalog/pg_database.h"
      28             : #include "catalog/pg_inherits.h"
      29             : #include "catalog/pg_namespace.h"
      30             : #include "catalog/pg_proc.h"
      31             : #include "catalog/pg_publication.h"
      32             : #include "catalog/pg_publication_namespace.h"
      33             : #include "catalog/pg_publication_rel.h"
      34             : #include "catalog/pg_type.h"
      35             : #include "commands/dbcommands.h"
      36             : #include "commands/defrem.h"
      37             : #include "commands/event_trigger.h"
      38             : #include "commands/publicationcmds.h"
      39             : #include "funcapi.h"
      40             : #include "miscadmin.h"
      41             : #include "nodes/nodeFuncs.h"
      42             : #include "parser/parse_clause.h"
      43             : #include "parser/parse_collate.h"
      44             : #include "parser/parse_relation.h"
      45             : #include "storage/lmgr.h"
      46             : #include "utils/acl.h"
      47             : #include "utils/array.h"
      48             : #include "utils/builtins.h"
      49             : #include "utils/catcache.h"
      50             : #include "utils/fmgroids.h"
      51             : #include "utils/inval.h"
      52             : #include "utils/lsyscache.h"
      53             : #include "utils/rel.h"
      54             : #include "utils/syscache.h"
      55             : #include "utils/varlena.h"
      56             : 
      57             : 
      58             : /*
      59             :  * Information used to validate the columns in the row filter expression. See
      60             :  * contain_invalid_rfcolumn_walker for details.
      61             :  */
      62             : typedef struct rf_context
      63             : {
      64             :     Bitmapset  *bms_replident;  /* bitset of replica identity columns */
      65             :     bool        pubviaroot;     /* true if we are validating the parent
      66             :                                  * relation's row filter */
      67             :     Oid         relid;          /* relid of the relation */
      68             :     Oid         parentid;       /* relid of the parent relation */
      69             : } rf_context;
      70             : 
      71             : static List *OpenTableList(List *tables);
      72             : static void CloseTableList(List *rels);
      73             : static void LockSchemaList(List *schemalist);
      74             : static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
      75             :                                  AlterPublicationStmt *stmt);
      76             : static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
      77             : static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
      78             :                                   AlterPublicationStmt *stmt);
      79             : static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
      80             : 
      81             : 
      82             : static void
      83         750 : parse_publication_options(ParseState *pstate,
      84             :                           List *options,
      85             :                           bool *publish_given,
      86             :                           PublicationActions *pubactions,
      87             :                           bool *publish_via_partition_root_given,
      88             :                           bool *publish_via_partition_root)
      89             : {
      90             :     ListCell   *lc;
      91             : 
      92         750 :     *publish_given = false;
      93         750 :     *publish_via_partition_root_given = false;
      94             : 
      95             :     /* defaults */
      96         750 :     pubactions->pubinsert = true;
      97         750 :     pubactions->pubupdate = true;
      98         750 :     pubactions->pubdelete = true;
      99         750 :     pubactions->pubtruncate = true;
     100         750 :     *publish_via_partition_root = false;
     101             : 
     102             :     /* Parse options */
     103        1020 :     foreach(lc, options)
     104             :     {
     105         288 :         DefElem    *defel = (DefElem *) lfirst(lc);
     106             : 
     107         288 :         if (strcmp(defel->defname, "publish") == 0)
     108             :         {
     109             :             char       *publish;
     110             :             List       *publish_list;
     111             :             ListCell   *lc2;
     112             : 
     113         120 :             if (*publish_given)
     114           0 :                 errorConflictingDefElem(defel, pstate);
     115             : 
     116             :             /*
     117             :              * If publish option was given only the explicitly listed actions
     118             :              * should be published.
     119             :              */
     120         120 :             pubactions->pubinsert = false;
     121         120 :             pubactions->pubupdate = false;
     122         120 :             pubactions->pubdelete = false;
     123         120 :             pubactions->pubtruncate = false;
     124             : 
     125         120 :             *publish_given = true;
     126         120 :             publish = defGetString(defel);
     127             : 
     128         120 :             if (!SplitIdentifierString(publish, ',', &publish_list))
     129           0 :                 ereport(ERROR,
     130             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     131             :                          errmsg("invalid list syntax in parameter \"%s\"",
     132             :                                 "publish")));
     133             : 
     134             :             /* Process the option list. */
     135         260 :             foreach(lc2, publish_list)
     136             :             {
     137         146 :                 char       *publish_opt = (char *) lfirst(lc2);
     138             : 
     139         146 :                 if (strcmp(publish_opt, "insert") == 0)
     140         106 :                     pubactions->pubinsert = true;
     141          40 :                 else if (strcmp(publish_opt, "update") == 0)
     142          18 :                     pubactions->pubupdate = true;
     143          22 :                 else if (strcmp(publish_opt, "delete") == 0)
     144           8 :                     pubactions->pubdelete = true;
     145          14 :                 else if (strcmp(publish_opt, "truncate") == 0)
     146           8 :                     pubactions->pubtruncate = true;
     147             :                 else
     148           6 :                     ereport(ERROR,
     149             :                             (errcode(ERRCODE_SYNTAX_ERROR),
     150             :                              errmsg("unrecognized value for publication option \"%s\": \"%s\"",
     151             :                                     "publish", publish_opt)));
     152             :             }
     153             :         }
     154         168 :         else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
     155             :         {
     156         162 :             if (*publish_via_partition_root_given)
     157           6 :                 errorConflictingDefElem(defel, pstate);
     158         156 :             *publish_via_partition_root_given = true;
     159         156 :             *publish_via_partition_root = defGetBoolean(defel);
     160             :         }
     161             :         else
     162           6 :             ereport(ERROR,
     163             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     164             :                      errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
     165             :     }
     166         732 : }
     167             : 
     168             : /*
     169             :  * Convert the PublicationObjSpecType list into schema oid list and
     170             :  * PublicationTable list.
     171             :  */
     172             : static void
     173        1476 : ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
     174             :                            List **rels, List **schemas)
     175             : {
     176             :     ListCell   *cell;
     177             :     PublicationObjSpec *pubobj;
     178             : 
     179        1476 :     if (!pubobjspec_list)
     180          78 :         return;
     181             : 
     182        2958 :     foreach(cell, pubobjspec_list)
     183             :     {
     184             :         Oid         schemaid;
     185             :         List       *search_path;
     186             : 
     187        1596 :         pubobj = (PublicationObjSpec *) lfirst(cell);
     188             : 
     189        1596 :         switch (pubobj->pubobjtype)
     190             :         {
     191        1240 :             case PUBLICATIONOBJ_TABLE:
     192        1240 :                 *rels = lappend(*rels, pubobj->pubtable);
     193        1240 :                 break;
     194         332 :             case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
     195         332 :                 schemaid = get_namespace_oid(pubobj->name, false);
     196             : 
     197             :                 /* Filter out duplicates if user specifies "sch1, sch1" */
     198         302 :                 *schemas = list_append_unique_oid(*schemas, schemaid);
     199         302 :                 break;
     200          24 :             case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
     201          24 :                 search_path = fetch_search_path(false);
     202          24 :                 if (search_path == NIL) /* nothing valid in search_path? */
     203           6 :                     ereport(ERROR,
     204             :                             errcode(ERRCODE_UNDEFINED_SCHEMA),
     205             :                             errmsg("no schema has been selected for CURRENT_SCHEMA"));
     206             : 
     207          18 :                 schemaid = linitial_oid(search_path);
     208          18 :                 list_free(search_path);
     209             : 
     210             :                 /* Filter out duplicates if user specifies "sch1, sch1" */
     211          18 :                 *schemas = list_append_unique_oid(*schemas, schemaid);
     212          18 :                 break;
     213           0 :             default:
     214             :                 /* shouldn't happen */
     215           0 :                 elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
     216             :                 break;
     217             :         }
     218             :     }
     219             : }
     220             : 
     221             : /*
     222             :  * Returns true if any of the columns used in the row filter WHERE expression is
     223             :  * not part of REPLICA IDENTITY, false otherwise.
     224             :  */
     225             : static bool
     226         258 : contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
     227             : {
     228         258 :     if (node == NULL)
     229           0 :         return false;
     230             : 
     231         258 :     if (IsA(node, Var))
     232             :     {
     233         104 :         Var        *var = (Var *) node;
     234         104 :         AttrNumber  attnum = var->varattno;
     235             : 
     236             :         /*
     237             :          * If pubviaroot is true, we are validating the row filter of the
     238             :          * parent table, but the bitmap contains the replica identity
     239             :          * information of the child table. So, get the column number of the
     240             :          * child table as parent and child column order could be different.
     241             :          */
     242         104 :         if (context->pubviaroot)
     243             :         {
     244          16 :             char       *colname = get_attname(context->parentid, attnum, false);
     245             : 
     246          16 :             attnum = get_attnum(context->relid, colname);
     247             :         }
     248             : 
     249         104 :         if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
     250         104 :                            context->bms_replident))
     251          60 :             return true;
     252             :     }
     253             : 
     254         198 :     return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
     255             :                                   (void *) context);
     256             : }
     257             : 
     258             : /*
     259             :  * Check if all columns referenced in the filter expression are part of the
     260             :  * REPLICA IDENTITY index or not.
     261             :  *
     262             :  * Returns true if any invalid column is found.
     263             :  */
     264             : bool
     265         644 : pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
     266             :                                bool pubviaroot)
     267             : {
     268             :     HeapTuple   rftuple;
     269         644 :     Oid         relid = RelationGetRelid(relation);
     270         644 :     Oid         publish_as_relid = RelationGetRelid(relation);
     271         644 :     bool        result = false;
     272             :     Datum       rfdatum;
     273             :     bool        rfisnull;
     274             : 
     275             :     /*
     276             :      * FULL means all columns are in the REPLICA IDENTITY, so all columns are
     277             :      * allowed in the row filter and we can skip the validation.
     278             :      */
     279         644 :     if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     280         144 :         return false;
     281             : 
     282             :     /*
     283             :      * For a partition, if pubviaroot is true, find the topmost ancestor that
     284             :      * is published via this publication as we need to use its row filter
     285             :      * expression to filter the partition's changes.
     286             :      *
     287             :      * Note that even though the row filter used is for an ancestor, the
     288             :      * REPLICA IDENTITY used will be for the actual child table.
     289             :      */
     290         500 :     if (pubviaroot && relation->rd_rel->relispartition)
     291             :     {
     292             :         publish_as_relid
     293         106 :             = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
     294             : 
     295         106 :         if (!OidIsValid(publish_as_relid))
     296           6 :             publish_as_relid = relid;
     297             :     }
     298             : 
     299         500 :     rftuple = SearchSysCache2(PUBLICATIONRELMAP,
     300             :                               ObjectIdGetDatum(publish_as_relid),
     301             :                               ObjectIdGetDatum(pubid));
     302             : 
     303         500 :     if (!HeapTupleIsValid(rftuple))
     304          56 :         return false;
     305             : 
     306         444 :     rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
     307             :                               Anum_pg_publication_rel_prqual,
     308             :                               &rfisnull);
     309             : 
     310         444 :     if (!rfisnull)
     311             :     {
     312         102 :         rf_context  context = {0};
     313             :         Node       *rfnode;
     314         102 :         Bitmapset  *bms = NULL;
     315             : 
     316         102 :         context.pubviaroot = pubviaroot;
     317         102 :         context.parentid = publish_as_relid;
     318         102 :         context.relid = relid;
     319             : 
     320             :         /* Remember columns that are part of the REPLICA IDENTITY */
     321         102 :         bms = RelationGetIndexAttrBitmap(relation,
     322             :                                          INDEX_ATTR_BITMAP_IDENTITY_KEY);
     323             : 
     324         102 :         context.bms_replident = bms;
     325         102 :         rfnode = stringToNode(TextDatumGetCString(rfdatum));
     326         102 :         result = contain_invalid_rfcolumn_walker(rfnode, &context);
     327             :     }
     328             : 
     329         444 :     ReleaseSysCache(rftuple);
     330             : 
     331         444 :     return result;
     332             : }
     333             : 
     334             : /*
     335             :  * Check if all columns referenced in the REPLICA IDENTITY are covered by
     336             :  * the column list.
     337             :  *
     338             :  * Returns true if any replica identity column is not covered by column list.
     339             :  */
     340             : bool
     341         644 : pub_collist_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
     342             :                                     bool pubviaroot)
     343             : {
     344             :     HeapTuple   tuple;
     345         644 :     Oid         relid = RelationGetRelid(relation);
     346         644 :     Oid         publish_as_relid = RelationGetRelid(relation);
     347         644 :     bool        result = false;
     348             :     Datum       datum;
     349             :     bool        isnull;
     350             : 
     351             :     /*
     352             :      * For a partition, if pubviaroot is true, find the topmost ancestor that
     353             :      * is published via this publication as we need to use its column list for
     354             :      * the changes.
     355             :      *
     356             :      * Note that even though the column list used is for an ancestor, the
     357             :      * REPLICA IDENTITY used will be for the actual child table.
     358             :      */
     359         644 :     if (pubviaroot && relation->rd_rel->relispartition)
     360             :     {
     361         128 :         publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
     362             : 
     363         128 :         if (!OidIsValid(publish_as_relid))
     364           6 :             publish_as_relid = relid;
     365             :     }
     366             : 
     367         644 :     tuple = SearchSysCache2(PUBLICATIONRELMAP,
     368             :                             ObjectIdGetDatum(publish_as_relid),
     369             :                             ObjectIdGetDatum(pubid));
     370             : 
     371         644 :     if (!HeapTupleIsValid(tuple))
     372          64 :         return false;
     373             : 
     374         580 :     datum = SysCacheGetAttr(PUBLICATIONRELMAP, tuple,
     375             :                             Anum_pg_publication_rel_prattrs,
     376             :                             &isnull);
     377             : 
     378         580 :     if (!isnull)
     379             :     {
     380             :         int         x;
     381             :         Bitmapset  *idattrs;
     382         230 :         Bitmapset  *columns = NULL;
     383             : 
     384             :         /* With REPLICA IDENTITY FULL, no column list is allowed. */
     385         230 :         if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     386          36 :             result = true;
     387             : 
     388             :         /* Transform the column list datum to a bitmapset. */
     389         230 :         columns = pub_collist_to_bitmapset(NULL, datum, NULL);
     390             : 
     391             :         /* Remember columns that are part of the REPLICA IDENTITY */
     392         230 :         idattrs = RelationGetIndexAttrBitmap(relation,
     393             :                                              INDEX_ATTR_BITMAP_IDENTITY_KEY);
     394             : 
     395             :         /*
     396             :          * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are
     397             :          * offset (to handle system columns the usual way), while column list
     398             :          * does not use offset, so we can't do bms_is_subset(). Instead, we
     399             :          * have to loop over the idattrs and check all of them are in the
     400             :          * list.
     401             :          */
     402         230 :         x = -1;
     403         350 :         while ((x = bms_next_member(idattrs, x)) >= 0)
     404             :         {
     405         192 :             AttrNumber  attnum = (x + FirstLowInvalidHeapAttributeNumber);
     406             : 
     407             :             /*
     408             :              * If pubviaroot is true, we are validating the column list of the
     409             :              * parent table, but the bitmap contains the replica identity
     410             :              * information of the child table. The parent/child attnums may
     411             :              * not match, so translate them to the parent - get the attname
     412             :              * from the child, and look it up in the parent.
     413             :              */
     414         192 :             if (pubviaroot)
     415             :             {
     416             :                 /* attribute name in the child table */
     417          80 :                 char       *colname = get_attname(relid, attnum, false);
     418             : 
     419             :                 /*
     420             :                  * Determine the attnum for the attribute name in parent (we
     421             :                  * are using the column list defined on the parent).
     422             :                  */
     423          80 :                 attnum = get_attnum(publish_as_relid, colname);
     424             :             }
     425             : 
     426             :             /* replica identity column, not covered by the column list */
     427         192 :             if (!bms_is_member(attnum, columns))
     428             :             {
     429          72 :                 result = true;
     430          72 :                 break;
     431             :             }
     432             :         }
     433             : 
     434         230 :         bms_free(idattrs);
     435         230 :         bms_free(columns);
     436             :     }
     437             : 
     438         580 :     ReleaseSysCache(tuple);
     439             : 
     440         580 :     return result;
     441             : }
     442             : 
     443             : /* check_functions_in_node callback */
     444             : static bool
     445         396 : contain_mutable_or_user_functions_checker(Oid func_id, void *context)
     446             : {
     447         396 :     return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
     448             :             func_id >= FirstNormalObjectId);
     449             : }
     450             : 
     451             : /*
     452             :  * The row filter walker checks if the row filter expression is a "simple
     453             :  * expression".
     454             :  *
     455             :  * It allows only simple or compound expressions such as:
     456             :  * - (Var Op Const)
     457             :  * - (Var Op Var)
     458             :  * - (Var Op Const) AND/OR (Var Op Const)
     459             :  * - etc
     460             :  * (where Var is a column of the table this filter belongs to)
     461             :  *
     462             :  * The simple expression has the following restrictions:
     463             :  * - User-defined operators are not allowed;
     464             :  * - User-defined functions are not allowed;
     465             :  * - User-defined types are not allowed;
     466             :  * - User-defined collations are not allowed;
     467             :  * - Non-immutable built-in functions are not allowed;
     468             :  * - System columns are not allowed.
     469             :  *
     470             :  * NOTES
     471             :  *
     472             :  * We don't allow user-defined functions/operators/types/collations because
     473             :  * (a) if a user drops a user-defined object used in a row filter expression or
     474             :  * if there is any other error while using it, the logical decoding
     475             :  * infrastructure won't be able to recover from such an error even if the
     476             :  * object is recreated again because a historic snapshot is used to evaluate
     477             :  * the row filter;
     478             :  * (b) a user-defined function can be used to access tables that could have
     479             :  * unpleasant results because a historic snapshot is used. That's why only
     480             :  * immutable built-in functions are allowed in row filter expressions.
     481             :  *
     482             :  * We don't allow system columns because currently, we don't have that
     483             :  * information in the tuple passed to downstream. Also, as we don't replicate
     484             :  * those to subscribers, there doesn't seem to be a need for a filter on those
     485             :  * columns.
     486             :  *
     487             :  * We can allow other node types after more analysis and testing.
     488             :  */
     489             : static bool
     490        1328 : check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
     491             : {
     492        1328 :     char       *errdetail_msg = NULL;
     493             : 
     494        1328 :     if (node == NULL)
     495           6 :         return false;
     496             : 
     497        1322 :     switch (nodeTag(node))
     498             :     {
     499         356 :         case T_Var:
     500             :             /* System columns are not allowed. */
     501         356 :             if (((Var *) node)->varattno < InvalidAttrNumber)
     502           6 :                 errdetail_msg = _("System columns are not allowed.");
     503         356 :             break;
     504         352 :         case T_OpExpr:
     505             :         case T_DistinctExpr:
     506             :         case T_NullIfExpr:
     507             :             /* OK, except user-defined operators are not allowed. */
     508         352 :             if (((OpExpr *) node)->opno >= FirstNormalObjectId)
     509           6 :                 errdetail_msg = _("User-defined operators are not allowed.");
     510         352 :             break;
     511           6 :         case T_ScalarArrayOpExpr:
     512             :             /* OK, except user-defined operators are not allowed. */
     513           6 :             if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
     514           0 :                 errdetail_msg = _("User-defined operators are not allowed.");
     515             : 
     516             :             /*
     517             :              * We don't need to check the hashfuncid and negfuncid of
     518             :              * ScalarArrayOpExpr as those functions are only built for a
     519             :              * subquery.
     520             :              */
     521           6 :             break;
     522           6 :         case T_RowCompareExpr:
     523             :             {
     524             :                 ListCell   *opid;
     525             : 
     526             :                 /* OK, except user-defined operators are not allowed. */
     527          18 :                 foreach(opid, ((RowCompareExpr *) node)->opnos)
     528             :                 {
     529          12 :                     if (lfirst_oid(opid) >= FirstNormalObjectId)
     530             :                     {
     531           0 :                         errdetail_msg = _("User-defined operators are not allowed.");
     532           0 :                         break;
     533             :                     }
     534             :                 }
     535             :             }
     536           6 :             break;
     537         596 :         case T_Const:
     538             :         case T_FuncExpr:
     539             :         case T_BoolExpr:
     540             :         case T_RelabelType:
     541             :         case T_CollateExpr:
     542             :         case T_CaseExpr:
     543             :         case T_CaseTestExpr:
     544             :         case T_ArrayExpr:
     545             :         case T_RowExpr:
     546             :         case T_CoalesceExpr:
     547             :         case T_MinMaxExpr:
     548             :         case T_XmlExpr:
     549             :         case T_NullTest:
     550             :         case T_BooleanTest:
     551             :         case T_List:
     552             :             /* OK, supported */
     553         596 :             break;
     554           6 :         default:
     555           6 :             errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
     556           6 :             break;
     557             :     }
     558             : 
     559             :     /*
     560             :      * For all the supported nodes, if we haven't already found a problem,
     561             :      * check the types, functions, and collations used in it.  We check List
     562             :      * by walking through each element.
     563             :      */
     564        1322 :     if (!errdetail_msg && !IsA(node, List))
     565             :     {
     566        1250 :         if (exprType(node) >= FirstNormalObjectId)
     567           6 :             errdetail_msg = _("User-defined types are not allowed.");
     568        1244 :         else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
     569             :                                          (void *) pstate))
     570          12 :             errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
     571        2464 :         else if (exprCollation(node) >= FirstNormalObjectId ||
     572        1232 :                  exprInputCollation(node) >= FirstNormalObjectId)
     573           6 :             errdetail_msg = _("User-defined collations are not allowed.");
     574             :     }
     575             : 
     576             :     /*
     577             :      * If we found a problem in this node, throw error now. Otherwise keep
     578             :      * going.
     579             :      */
     580        1322 :     if (errdetail_msg)
     581          42 :         ereport(ERROR,
     582             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     583             :                  errmsg("invalid publication WHERE expression"),
     584             :                  errdetail_internal("%s", errdetail_msg),
     585             :                  parser_errposition(pstate, exprLocation(node))));
     586             : 
     587        1280 :     return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
     588             :                                   (void *) pstate);
     589             : }
     590             : 
     591             : /*
     592             :  * Check if the row filter expression is a "simple expression".
     593             :  *
     594             :  * See check_simple_rowfilter_expr_walker for details.
     595             :  */
     596             : static bool
     597         344 : check_simple_rowfilter_expr(Node *node, ParseState *pstate)
     598             : {
     599         344 :     return check_simple_rowfilter_expr_walker(node, pstate);
     600             : }
     601             : 
     602             : /*
     603             :  * Transform the publication WHERE expression for all the relations in the list,
     604             :  * ensuring it is coerced to boolean and necessary collation information is
     605             :  * added if required, and add a new nsitem/RTE for the associated relation to
     606             :  * the ParseState's namespace list.
     607             :  *
     608             :  * Also check the publication row filter expression and throw an error if
     609             :  * anything not permitted or unexpected is encountered.
     610             :  */
     611             : static void
     612        1024 : TransformPubWhereClauses(List *tables, const char *queryString,
     613             :                          bool pubviaroot)
     614             : {
     615             :     ListCell   *lc;
     616             : 
     617        2048 :     foreach(lc, tables)
     618             :     {
     619             :         ParseNamespaceItem *nsitem;
     620        1084 :         Node       *whereclause = NULL;
     621             :         ParseState *pstate;
     622        1084 :         PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
     623             : 
     624        1084 :         if (pri->whereClause == NULL)
     625         722 :             continue;
     626             : 
     627             :         /*
     628             :          * If the publication doesn't publish changes via the root partitioned
     629             :          * table, the partition's row filter will be used. So disallow using
     630             :          * WHERE clause on partitioned table in this case.
     631             :          */
     632         362 :         if (!pubviaroot &&
     633         340 :             pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     634           6 :             ereport(ERROR,
     635             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     636             :                      errmsg("cannot use publication WHERE clause for relation \"%s\"",
     637             :                             RelationGetRelationName(pri->relation)),
     638             :                      errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
     639             :                                "publish_via_partition_root")));
     640             : 
     641             :         /*
     642             :          * A fresh pstate is required so that we only have "this" table in its
     643             :          * rangetable
     644             :          */
     645         356 :         pstate = make_parsestate(NULL);
     646         356 :         pstate->p_sourcetext = queryString;
     647         356 :         nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
     648             :                                                AccessShareLock, NULL,
     649             :                                                false, false);
     650         356 :         addNSItemToQuery(pstate, nsitem, false, true, true);
     651             : 
     652         356 :         whereclause = transformWhereClause(pstate,
     653         356 :                                            copyObject(pri->whereClause),
     654             :                                            EXPR_KIND_WHERE,
     655             :                                            "PUBLICATION WHERE");
     656             : 
     657             :         /* Fix up collation information */
     658         344 :         assign_expr_collations(pstate, whereclause);
     659             : 
     660             :         /*
     661             :          * We allow only simple expressions in row filters. See
     662             :          * check_simple_rowfilter_expr_walker.
     663             :          */
     664         344 :         check_simple_rowfilter_expr(whereclause, pstate);
     665             : 
     666         302 :         free_parsestate(pstate);
     667             : 
     668         302 :         pri->whereClause = whereclause;
     669             :     }
     670         964 : }
     671             : 
     672             : 
     673             : /*
     674             :  * Given a list of tables that are going to be added to a publication,
     675             :  * verify that they fulfill the necessary preconditions, namely: no tables
     676             :  * have a column list if any schema is published; and partitioned tables do
     677             :  * not have column lists if publish_via_partition_root is not set.
     678             :  *
     679             :  * 'publish_schema' indicates that the publication contains any TABLES IN
     680             :  * SCHEMA elements (newly added in this command, or preexisting).
     681             :  * 'pubviaroot' is the value of publish_via_partition_root.
     682             :  */
     683             : static void
     684         964 : CheckPubRelationColumnList(char *pubname, List *tables,
     685             :                            bool publish_schema, bool pubviaroot)
     686             : {
     687             :     ListCell   *lc;
     688             : 
     689        1958 :     foreach(lc, tables)
     690             :     {
     691        1024 :         PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
     692             : 
     693        1024 :         if (pri->columns == NIL)
     694         698 :             continue;
     695             : 
     696             :         /*
     697             :          * Disallow specifying column list if any schema is in the
     698             :          * publication.
     699             :          *
     700             :          * XXX We could instead just forbid the case when the publication
     701             :          * tries to publish the table with a column list and a schema for that
     702             :          * table. However, if we do that then we need a restriction during
     703             :          * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
     704             :          * seem to be a good idea.
     705             :          */
     706         326 :         if (publish_schema)
     707          24 :             ereport(ERROR,
     708             :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     709             :                     errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
     710             :                            get_namespace_name(RelationGetNamespace(pri->relation)),
     711             :                            RelationGetRelationName(pri->relation), pubname),
     712             :                     errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
     713             : 
     714             :         /*
     715             :          * If the publication doesn't publish changes via the root partitioned
     716             :          * table, the partition's column list will be used. So disallow using
     717             :          * a column list on the partitioned table in this case.
     718             :          */
     719         302 :         if (!pubviaroot &&
     720         226 :             pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     721           6 :             ereport(ERROR,
     722             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     723             :                      errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
     724             :                             get_namespace_name(RelationGetNamespace(pri->relation)),
     725             :                             RelationGetRelationName(pri->relation), pubname),
     726             :                      errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
     727             :                                "publish_via_partition_root")));
     728             :     }
     729         934 : }
     730             : 
     731             : /*
     732             :  * Create new publication.
     733             :  */
     734             : ObjectAddress
     735         652 : CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
     736             : {
     737             :     Relation    rel;
     738             :     ObjectAddress myself;
     739             :     Oid         puboid;
     740             :     bool        nulls[Natts_pg_publication];
     741             :     Datum       values[Natts_pg_publication];
     742             :     HeapTuple   tup;
     743             :     bool        publish_given;
     744             :     PublicationActions pubactions;
     745             :     bool        publish_via_partition_root_given;
     746             :     bool        publish_via_partition_root;
     747             :     AclResult   aclresult;
     748         652 :     List       *relations = NIL;
     749         652 :     List       *schemaidlist = NIL;
     750             : 
     751             :     /* must have CREATE privilege on database */
     752         652 :     aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
     753         652 :     if (aclresult != ACLCHECK_OK)
     754           6 :         aclcheck_error(aclresult, OBJECT_DATABASE,
     755           6 :                        get_database_name(MyDatabaseId));
     756             : 
     757             :     /* FOR ALL TABLES requires superuser */
     758         646 :     if (stmt->for_all_tables && !superuser())
     759           0 :         ereport(ERROR,
     760             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     761             :                  errmsg("must be superuser to create FOR ALL TABLES publication")));
     762             : 
     763         646 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
     764             : 
     765             :     /* Check if name is used */
     766         646 :     puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
     767             :                              CStringGetDatum(stmt->pubname));
     768         646 :     if (OidIsValid(puboid))
     769           6 :         ereport(ERROR,
     770             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     771             :                  errmsg("publication \"%s\" already exists",
     772             :                         stmt->pubname)));
     773             : 
     774             :     /* Form a tuple. */
     775         640 :     memset(values, 0, sizeof(values));
     776         640 :     memset(nulls, false, sizeof(nulls));
     777             : 
     778         640 :     values[Anum_pg_publication_pubname - 1] =
     779         640 :         DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
     780         640 :     values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
     781             : 
     782         640 :     parse_publication_options(pstate,
     783             :                               stmt->options,
     784             :                               &publish_given, &pubactions,
     785             :                               &publish_via_partition_root_given,
     786             :                               &publish_via_partition_root);
     787             : 
     788         622 :     puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
     789             :                                 Anum_pg_publication_oid);
     790         622 :     values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
     791         622 :     values[Anum_pg_publication_puballtables - 1] =
     792         622 :         BoolGetDatum(stmt->for_all_tables);
     793         622 :     values[Anum_pg_publication_pubinsert - 1] =
     794         622 :         BoolGetDatum(pubactions.pubinsert);
     795         622 :     values[Anum_pg_publication_pubupdate - 1] =
     796         622 :         BoolGetDatum(pubactions.pubupdate);
     797         622 :     values[Anum_pg_publication_pubdelete - 1] =
     798         622 :         BoolGetDatum(pubactions.pubdelete);
     799         622 :     values[Anum_pg_publication_pubtruncate - 1] =
     800         622 :         BoolGetDatum(pubactions.pubtruncate);
     801         622 :     values[Anum_pg_publication_pubviaroot - 1] =
     802         622 :         BoolGetDatum(publish_via_partition_root);
     803             : 
     804         622 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     805             : 
     806             :     /* Insert tuple into catalog. */
     807         622 :     CatalogTupleInsert(rel, tup);
     808         622 :     heap_freetuple(tup);
     809             : 
     810         622 :     recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
     811             : 
     812         622 :     ObjectAddressSet(myself, PublicationRelationId, puboid);
     813             : 
     814             :     /* Make the changes visible. */
     815         622 :     CommandCounterIncrement();
     816             : 
     817             :     /* Associate objects with the publication. */
     818         622 :     if (stmt->for_all_tables)
     819             :     {
     820             :         /* Invalidate relcache so that publication info is rebuilt. */
     821          48 :         CacheInvalidateRelcacheAll();
     822             :     }
     823             :     else
     824             :     {
     825         574 :         ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
     826             :                                    &schemaidlist);
     827             : 
     828             :         /* FOR TABLES IN SCHEMA requires superuser */
     829         556 :         if (schemaidlist != NIL && !superuser())
     830           6 :             ereport(ERROR,
     831             :                     errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     832             :                     errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
     833             : 
     834         550 :         if (relations != NIL)
     835             :         {
     836             :             List       *rels;
     837             : 
     838         364 :             rels = OpenTableList(relations);
     839         340 :             TransformPubWhereClauses(rels, pstate->p_sourcetext,
     840             :                                      publish_via_partition_root);
     841             : 
     842         316 :             CheckPubRelationColumnList(stmt->pubname, rels,
     843             :                                        schemaidlist != NIL,
     844             :                                        publish_via_partition_root);
     845             : 
     846         310 :             PublicationAddTables(puboid, rels, true, NULL);
     847         284 :             CloseTableList(rels);
     848             :         }
     849             : 
     850         470 :         if (schemaidlist != NIL)
     851             :         {
     852             :             /*
     853             :              * Schema lock is held until the publication is created to prevent
     854             :              * concurrent schema deletion.
     855             :              */
     856         134 :             LockSchemaList(schemaidlist);
     857         134 :             PublicationAddSchemas(puboid, schemaidlist, true, NULL);
     858             :         }
     859             :     }
     860             : 
     861         512 :     table_close(rel, RowExclusiveLock);
     862             : 
     863         512 :     InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
     864             : 
     865         512 :     if (wal_level != WAL_LEVEL_LOGICAL)
     866         306 :         ereport(WARNING,
     867             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     868             :                  errmsg("wal_level is insufficient to publish logical changes"),
     869             :                  errhint("Set wal_level to \"logical\" before creating subscriptions.")));
     870             : 
     871         512 :     return myself;
     872             : }
     873             : 
     874             : /*
     875             :  * Change options of a publication.
     876             :  */
     877             : static void
     878         110 : AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
     879             :                         Relation rel, HeapTuple tup)
     880             : {
     881             :     bool        nulls[Natts_pg_publication];
     882             :     bool        replaces[Natts_pg_publication];
     883             :     Datum       values[Natts_pg_publication];
     884             :     bool        publish_given;
     885             :     PublicationActions pubactions;
     886             :     bool        publish_via_partition_root_given;
     887             :     bool        publish_via_partition_root;
     888             :     ObjectAddress obj;
     889             :     Form_pg_publication pubform;
     890         110 :     List       *root_relids = NIL;
     891             :     ListCell   *lc;
     892             : 
     893         110 :     parse_publication_options(pstate,
     894             :                               stmt->options,
     895             :                               &publish_given, &pubactions,
     896             :                               &publish_via_partition_root_given,
     897             :                               &publish_via_partition_root);
     898             : 
     899         110 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
     900             : 
     901             :     /*
     902             :      * If the publication doesn't publish changes via the root partitioned
     903             :      * table, the partition's row filter and column list will be used. So
     904             :      * disallow using WHERE clause and column lists on partitioned table in
     905             :      * this case.
     906             :      */
     907         110 :     if (!pubform->puballtables && publish_via_partition_root_given &&
     908          80 :         !publish_via_partition_root)
     909             :     {
     910             :         /*
     911             :          * Lock the publication so nobody else can do anything with it. This
     912             :          * prevents concurrent alter to add partitioned table(s) with WHERE
     913             :          * clause(s) and/or column lists which we don't allow when not
     914             :          * publishing via root.
     915             :          */
     916          48 :         LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
     917             :                            AccessShareLock);
     918             : 
     919          48 :         root_relids = GetPublicationRelations(pubform->oid,
     920             :                                               PUBLICATION_PART_ROOT);
     921             : 
     922          84 :         foreach(lc, root_relids)
     923             :         {
     924          48 :             Oid         relid = lfirst_oid(lc);
     925             :             HeapTuple   rftuple;
     926             :             char        relkind;
     927             :             char       *relname;
     928             :             bool        has_rowfilter;
     929             :             bool        has_collist;
     930             : 
     931             :             /*
     932             :              * Beware: we don't have lock on the relations, so cope silently
     933             :              * with the cache lookups returning NULL.
     934             :              */
     935             : 
     936          48 :             rftuple = SearchSysCache2(PUBLICATIONRELMAP,
     937             :                                       ObjectIdGetDatum(relid),
     938             :                                       ObjectIdGetDatum(pubform->oid));
     939          48 :             if (!HeapTupleIsValid(rftuple))
     940           0 :                 continue;
     941          48 :             has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
     942          48 :             has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
     943          48 :             if (!has_rowfilter && !has_collist)
     944             :             {
     945          12 :                 ReleaseSysCache(rftuple);
     946          12 :                 continue;
     947             :             }
     948             : 
     949          36 :             relkind = get_rel_relkind(relid);
     950          36 :             if (relkind != RELKIND_PARTITIONED_TABLE)
     951             :             {
     952          24 :                 ReleaseSysCache(rftuple);
     953          24 :                 continue;
     954             :             }
     955          12 :             relname = get_rel_name(relid);
     956          12 :             if (relname == NULL)    /* table concurrently dropped */
     957             :             {
     958           0 :                 ReleaseSysCache(rftuple);
     959           0 :                 continue;
     960             :             }
     961             : 
     962          12 :             if (has_rowfilter)
     963           6 :                 ereport(ERROR,
     964             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     965             :                          errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
     966             :                                 "publish_via_partition_root",
     967             :                                 stmt->pubname),
     968             :                          errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
     969             :                                    relname, "publish_via_partition_root")));
     970             :             Assert(has_collist);
     971           6 :             ereport(ERROR,
     972             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     973             :                      errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
     974             :                             "publish_via_partition_root",
     975             :                             stmt->pubname),
     976             :                      errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
     977             :                                relname, "publish_via_partition_root")));
     978             :         }
     979             :     }
     980             : 
     981             :     /* Everything ok, form a new tuple. */
     982          98 :     memset(values, 0, sizeof(values));
     983          98 :     memset(nulls, false, sizeof(nulls));
     984          98 :     memset(replaces, false, sizeof(replaces));
     985             : 
     986          98 :     if (publish_given)
     987             :     {
     988          28 :         values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
     989          28 :         replaces[Anum_pg_publication_pubinsert - 1] = true;
     990             : 
     991          28 :         values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
     992          28 :         replaces[Anum_pg_publication_pubupdate - 1] = true;
     993             : 
     994          28 :         values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
     995          28 :         replaces[Anum_pg_publication_pubdelete - 1] = true;
     996             : 
     997          28 :         values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
     998          28 :         replaces[Anum_pg_publication_pubtruncate - 1] = true;
     999             :     }
    1000             : 
    1001          98 :     if (publish_via_partition_root_given)
    1002             :     {
    1003          70 :         values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
    1004          70 :         replaces[Anum_pg_publication_pubviaroot - 1] = true;
    1005             :     }
    1006             : 
    1007          98 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
    1008             :                             replaces);
    1009             : 
    1010             :     /* Update the catalog. */
    1011          98 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    1012             : 
    1013          98 :     CommandCounterIncrement();
    1014             : 
    1015          98 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1016             : 
    1017             :     /* Invalidate the relcache. */
    1018          98 :     if (pubform->puballtables)
    1019             :     {
    1020           8 :         CacheInvalidateRelcacheAll();
    1021             :     }
    1022             :     else
    1023             :     {
    1024          90 :         List       *relids = NIL;
    1025          90 :         List       *schemarelids = NIL;
    1026             : 
    1027             :         /*
    1028             :          * For any partitioned tables contained in the publication, we must
    1029             :          * invalidate all partitions contained in the respective partition
    1030             :          * trees, not just those explicitly mentioned in the publication.
    1031             :          */
    1032          90 :         if (root_relids == NIL)
    1033          54 :             relids = GetPublicationRelations(pubform->oid,
    1034             :                                              PUBLICATION_PART_ALL);
    1035             :         else
    1036             :         {
    1037             :             /*
    1038             :              * We already got tables explicitly mentioned in the publication.
    1039             :              * Now get all partitions for the partitioned table in the list.
    1040             :              */
    1041          72 :             foreach(lc, root_relids)
    1042          36 :                 relids = GetPubPartitionOptionRelations(relids,
    1043             :                                                         PUBLICATION_PART_ALL,
    1044             :                                                         lfirst_oid(lc));
    1045             :         }
    1046             : 
    1047          90 :         schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
    1048             :                                                         PUBLICATION_PART_ALL);
    1049          90 :         relids = list_concat_unique_oid(relids, schemarelids);
    1050             : 
    1051          90 :         InvalidatePublicationRels(relids);
    1052             :     }
    1053             : 
    1054          98 :     ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
    1055          98 :     EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
    1056             :                                      (Node *) stmt);
    1057             : 
    1058          98 :     InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
    1059          98 : }
    1060             : 
    1061             : /*
    1062             :  * Invalidate the relations.
    1063             :  */
    1064             : void
    1065        2144 : InvalidatePublicationRels(List *relids)
    1066             : {
    1067             :     /*
    1068             :      * We don't want to send too many individual messages, at some point it's
    1069             :      * cheaper to just reset whole relcache.
    1070             :      */
    1071        2144 :     if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
    1072             :     {
    1073             :         ListCell   *lc;
    1074             : 
    1075       18834 :         foreach(lc, relids)
    1076       16690 :             CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
    1077             :     }
    1078             :     else
    1079           0 :         CacheInvalidateRelcacheAll();
    1080        2144 : }
    1081             : 
    1082             : /*
    1083             :  * Add or remove table to/from publication.
    1084             :  */
    1085             : static void
    1086         842 : AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
    1087             :                        List *tables, const char *queryString,
    1088             :                        bool publish_schema)
    1089             : {
    1090         842 :     List       *rels = NIL;
    1091         842 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
    1092         842 :     Oid         pubid = pubform->oid;
    1093             : 
    1094             :     /*
    1095             :      * Nothing to do if no objects, except in SET: for that it is quite
    1096             :      * possible that user has not specified any tables in which case we need
    1097             :      * to remove all the existing tables.
    1098             :      */
    1099         842 :     if (!tables && stmt->action != AP_SetObjects)
    1100          66 :         return;
    1101             : 
    1102         776 :     rels = OpenTableList(tables);
    1103             : 
    1104         776 :     if (stmt->action == AP_AddObjects)
    1105             :     {
    1106         262 :         TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
    1107             : 
    1108         244 :         publish_schema |= is_schema_publication(pubid);
    1109             : 
    1110         244 :         CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
    1111         244 :                                    pubform->pubviaroot);
    1112             : 
    1113         232 :         PublicationAddTables(pubid, rels, false, stmt);
    1114             :     }
    1115         514 :     else if (stmt->action == AP_DropObjects)
    1116          92 :         PublicationDropTables(pubid, rels, false);
    1117             :     else                        /* AP_SetObjects */
    1118             :     {
    1119         422 :         List       *oldrelids = GetPublicationRelations(pubid,
    1120             :                                                         PUBLICATION_PART_ROOT);
    1121         422 :         List       *delrels = NIL;
    1122             :         ListCell   *oldlc;
    1123             : 
    1124         422 :         TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
    1125             : 
    1126         404 :         CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
    1127         404 :                                    pubform->pubviaroot);
    1128             : 
    1129             :         /*
    1130             :          * To recreate the relation list for the publication, look for
    1131             :          * existing relations that do not need to be dropped.
    1132             :          */
    1133         770 :         foreach(oldlc, oldrelids)
    1134             :         {
    1135         378 :             Oid         oldrelid = lfirst_oid(oldlc);
    1136             :             ListCell   *newlc;
    1137             :             PublicationRelInfo *oldrel;
    1138         378 :             bool        found = false;
    1139             :             HeapTuple   rftuple;
    1140         378 :             Node       *oldrelwhereclause = NULL;
    1141         378 :             Bitmapset  *oldcolumns = NULL;
    1142             : 
    1143             :             /* look up the cache for the old relmap */
    1144         378 :             rftuple = SearchSysCache2(PUBLICATIONRELMAP,
    1145             :                                       ObjectIdGetDatum(oldrelid),
    1146             :                                       ObjectIdGetDatum(pubid));
    1147             : 
    1148             :             /*
    1149             :              * See if the existing relation currently has a WHERE clause or a
    1150             :              * column list. We need to compare those too.
    1151             :              */
    1152         378 :             if (HeapTupleIsValid(rftuple))
    1153             :             {
    1154         378 :                 bool        isnull = true;
    1155             :                 Datum       whereClauseDatum;
    1156             :                 Datum       columnListDatum;
    1157             : 
    1158             :                 /* Load the WHERE clause for this table. */
    1159         378 :                 whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
    1160             :                                                    Anum_pg_publication_rel_prqual,
    1161             :                                                    &isnull);
    1162         378 :                 if (!isnull)
    1163         210 :                     oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
    1164             : 
    1165             :                 /* Transform the int2vector column list to a bitmap. */
    1166         378 :                 columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
    1167             :                                                   Anum_pg_publication_rel_prattrs,
    1168             :                                                   &isnull);
    1169             : 
    1170         378 :                 if (!isnull)
    1171         124 :                     oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
    1172             : 
    1173         378 :                 ReleaseSysCache(rftuple);
    1174             :             }
    1175             : 
    1176         742 :             foreach(newlc, rels)
    1177             :             {
    1178             :                 PublicationRelInfo *newpubrel;
    1179             :                 Oid         newrelid;
    1180         380 :                 Bitmapset  *newcolumns = NULL;
    1181             : 
    1182         380 :                 newpubrel = (PublicationRelInfo *) lfirst(newlc);
    1183         380 :                 newrelid = RelationGetRelid(newpubrel->relation);
    1184             : 
    1185             :                 /*
    1186             :                  * If the new publication has column list, transform it to a
    1187             :                  * bitmap too.
    1188             :                  */
    1189         380 :                 if (newpubrel->columns)
    1190             :                 {
    1191             :                     ListCell   *lc;
    1192             : 
    1193         354 :                     foreach(lc, newpubrel->columns)
    1194             :                     {
    1195         218 :                         char       *colname = strVal(lfirst(lc));
    1196         218 :                         AttrNumber  attnum = get_attnum(newrelid, colname);
    1197             : 
    1198         218 :                         newcolumns = bms_add_member(newcolumns, attnum);
    1199             :                     }
    1200             :                 }
    1201             : 
    1202             :                 /*
    1203             :                  * Check if any of the new set of relations matches with the
    1204             :                  * existing relations in the publication. Additionally, if the
    1205             :                  * relation has an associated WHERE clause, check the WHERE
    1206             :                  * expressions also match. Same for the column list. Drop the
    1207             :                  * rest.
    1208             :                  */
    1209         380 :                 if (RelationGetRelid(newpubrel->relation) == oldrelid)
    1210             :                 {
    1211         260 :                     if (equal(oldrelwhereclause, newpubrel->whereClause) &&
    1212          68 :                         bms_equal(oldcolumns, newcolumns))
    1213             :                     {
    1214          16 :                         found = true;
    1215          16 :                         break;
    1216             :                     }
    1217             :                 }
    1218             :             }
    1219             : 
    1220             :             /*
    1221             :              * Add the non-matched relations to a list so that they can be
    1222             :              * dropped.
    1223             :              */
    1224         378 :             if (!found)
    1225             :             {
    1226         362 :                 oldrel = palloc(sizeof(PublicationRelInfo));
    1227         362 :                 oldrel->whereClause = NULL;
    1228         362 :                 oldrel->columns = NIL;
    1229         362 :                 oldrel->relation = table_open(oldrelid,
    1230             :                                               ShareUpdateExclusiveLock);
    1231         362 :                 delrels = lappend(delrels, oldrel);
    1232             :             }
    1233             :         }
    1234             : 
    1235             :         /* And drop them. */
    1236         392 :         PublicationDropTables(pubid, delrels, true);
    1237             : 
    1238             :         /*
    1239             :          * Don't bother calculating the difference for adding, we'll catch and
    1240             :          * skip existing ones when doing catalog update.
    1241             :          */
    1242         392 :         PublicationAddTables(pubid, rels, true, stmt);
    1243             : 
    1244         392 :         CloseTableList(delrels);
    1245             :     }
    1246             : 
    1247         662 :     CloseTableList(rels);
    1248             : }
    1249             : 
    1250             : /*
    1251             :  * Alter the publication schemas.
    1252             :  *
    1253             :  * Add or remove schemas to/from publication.
    1254             :  */
    1255             : static void
    1256         728 : AlterPublicationSchemas(AlterPublicationStmt *stmt,
    1257             :                         HeapTuple tup, List *schemaidlist)
    1258             : {
    1259         728 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
    1260             : 
    1261             :     /*
    1262             :      * Nothing to do if no objects, except in SET: for that it is quite
    1263             :      * possible that user has not specified any schemas in which case we need
    1264             :      * to remove all the existing schemas.
    1265             :      */
    1266         728 :     if (!schemaidlist && stmt->action != AP_SetObjects)
    1267         270 :         return;
    1268             : 
    1269             :     /*
    1270             :      * Schema lock is held until the publication is altered to prevent
    1271             :      * concurrent schema deletion.
    1272             :      */
    1273         458 :     LockSchemaList(schemaidlist);
    1274         458 :     if (stmt->action == AP_AddObjects)
    1275             :     {
    1276             :         ListCell   *lc;
    1277             :         List       *reloids;
    1278             : 
    1279          28 :         reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
    1280             : 
    1281          36 :         foreach(lc, reloids)
    1282             :         {
    1283             :             HeapTuple   coltuple;
    1284             : 
    1285          14 :             coltuple = SearchSysCache2(PUBLICATIONRELMAP,
    1286             :                                        ObjectIdGetDatum(lfirst_oid(lc)),
    1287             :                                        ObjectIdGetDatum(pubform->oid));
    1288             : 
    1289          14 :             if (!HeapTupleIsValid(coltuple))
    1290           0 :                 continue;
    1291             : 
    1292             :             /*
    1293             :              * Disallow adding schema if column list is already part of the
    1294             :              * publication. See CheckPubRelationColumnList.
    1295             :              */
    1296          14 :             if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
    1297           6 :                 ereport(ERROR,
    1298             :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1299             :                         errmsg("cannot add schema to publication \"%s\"",
    1300             :                                stmt->pubname),
    1301             :                         errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
    1302             : 
    1303           8 :             ReleaseSysCache(coltuple);
    1304             :         }
    1305             : 
    1306          22 :         PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
    1307             :     }
    1308         430 :     else if (stmt->action == AP_DropObjects)
    1309          38 :         PublicationDropSchemas(pubform->oid, schemaidlist, false);
    1310             :     else                        /* AP_SetObjects */
    1311             :     {
    1312         392 :         List       *oldschemaids = GetPublicationSchemas(pubform->oid);
    1313         392 :         List       *delschemas = NIL;
    1314             : 
    1315             :         /* Identify which schemas should be dropped */
    1316         392 :         delschemas = list_difference_oid(oldschemaids, schemaidlist);
    1317             : 
    1318             :         /*
    1319             :          * Schema lock is held until the publication is altered to prevent
    1320             :          * concurrent schema deletion.
    1321             :          */
    1322         392 :         LockSchemaList(delschemas);
    1323             : 
    1324             :         /* And drop them */
    1325         392 :         PublicationDropSchemas(pubform->oid, delschemas, true);
    1326             : 
    1327             :         /*
    1328             :          * Don't bother calculating the difference for adding, we'll catch and
    1329             :          * skip existing ones when doing catalog update.
    1330             :          */
    1331         392 :         PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
    1332             :     }
    1333             : }
    1334             : 
    1335             : /*
    1336             :  * Check if relations and schemas can be in a given publication and throw
    1337             :  * appropriate error if not.
    1338             :  */
    1339             : static void
    1340         884 : CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
    1341             :                       List *tables, List *schemaidlist)
    1342             : {
    1343         884 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
    1344             : 
    1345         884 :     if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
    1346          94 :         schemaidlist && !superuser())
    1347           6 :         ereport(ERROR,
    1348             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1349             :                  errmsg("must be superuser to add or set schemas")));
    1350             : 
    1351             :     /*
    1352             :      * Check that user is allowed to manipulate the publication tables in
    1353             :      * schema
    1354             :      */
    1355         878 :     if (schemaidlist && pubform->puballtables)
    1356          18 :         ereport(ERROR,
    1357             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1358             :                  errmsg("publication \"%s\" is defined as FOR ALL TABLES",
    1359             :                         NameStr(pubform->pubname)),
    1360             :                  errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications.")));
    1361             : 
    1362             :     /* Check that user is allowed to manipulate the publication tables. */
    1363         860 :     if (tables && pubform->puballtables)
    1364          18 :         ereport(ERROR,
    1365             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1366             :                  errmsg("publication \"%s\" is defined as FOR ALL TABLES",
    1367             :                         NameStr(pubform->pubname)),
    1368             :                  errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
    1369         842 : }
    1370             : 
    1371             : /*
    1372             :  * Alter the existing publication.
    1373             :  *
    1374             :  * This is dispatcher function for AlterPublicationOptions,
    1375             :  * AlterPublicationSchemas and AlterPublicationTables.
    1376             :  */
    1377             : void
    1378        1012 : AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
    1379             : {
    1380             :     Relation    rel;
    1381             :     HeapTuple   tup;
    1382             :     Form_pg_publication pubform;
    1383             : 
    1384        1012 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    1385             : 
    1386        1012 :     tup = SearchSysCacheCopy1(PUBLICATIONNAME,
    1387             :                               CStringGetDatum(stmt->pubname));
    1388             : 
    1389        1012 :     if (!HeapTupleIsValid(tup))
    1390           0 :         ereport(ERROR,
    1391             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    1392             :                  errmsg("publication \"%s\" does not exist",
    1393             :                         stmt->pubname)));
    1394             : 
    1395        1012 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1396             : 
    1397             :     /* must be owner */
    1398        1012 :     if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
    1399           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
    1400           0 :                        stmt->pubname);
    1401             : 
    1402        1012 :     if (stmt->options)
    1403         110 :         AlterPublicationOptions(pstate, stmt, rel, tup);
    1404             :     else
    1405             :     {
    1406         902 :         List       *relations = NIL;
    1407         902 :         List       *schemaidlist = NIL;
    1408         902 :         Oid         pubid = pubform->oid;
    1409             : 
    1410         902 :         ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
    1411             :                                    &schemaidlist);
    1412             : 
    1413         884 :         CheckAlterPublication(stmt, tup, relations, schemaidlist);
    1414             : 
    1415         842 :         heap_freetuple(tup);
    1416             : 
    1417             :         /* Lock the publication so nobody else can do anything with it. */
    1418         842 :         LockDatabaseObject(PublicationRelationId, pubid, 0,
    1419             :                            AccessExclusiveLock);
    1420             : 
    1421             :         /*
    1422             :          * It is possible that by the time we acquire the lock on publication,
    1423             :          * concurrent DDL has removed it. We can test this by checking the
    1424             :          * existence of publication. We get the tuple again to avoid the risk
    1425             :          * of any publication option getting changed.
    1426             :          */
    1427         842 :         tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    1428         842 :         if (!HeapTupleIsValid(tup))
    1429           0 :             ereport(ERROR,
    1430             :                     errcode(ERRCODE_UNDEFINED_OBJECT),
    1431             :                     errmsg("publication \"%s\" does not exist",
    1432             :                            stmt->pubname));
    1433             : 
    1434         842 :         AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
    1435             :                                schemaidlist != NIL);
    1436         728 :         AlterPublicationSchemas(stmt, tup, schemaidlist);
    1437             :     }
    1438             : 
    1439             :     /* Cleanup. */
    1440         808 :     heap_freetuple(tup);
    1441         808 :     table_close(rel, RowExclusiveLock);
    1442         808 : }
    1443             : 
    1444             : /*
    1445             :  * Remove relation from publication by mapping OID.
    1446             :  */
    1447             : void
    1448         748 : RemovePublicationRelById(Oid proid)
    1449             : {
    1450             :     Relation    rel;
    1451             :     HeapTuple   tup;
    1452             :     Form_pg_publication_rel pubrel;
    1453         748 :     List       *relids = NIL;
    1454             : 
    1455         748 :     rel = table_open(PublicationRelRelationId, RowExclusiveLock);
    1456             : 
    1457         748 :     tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
    1458             : 
    1459         748 :     if (!HeapTupleIsValid(tup))
    1460           0 :         elog(ERROR, "cache lookup failed for publication table %u",
    1461             :              proid);
    1462             : 
    1463         748 :     pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
    1464             : 
    1465             :     /*
    1466             :      * Invalidate relcache so that publication info is rebuilt.
    1467             :      *
    1468             :      * For the partitioned tables, we must invalidate all partitions contained
    1469             :      * in the respective partition hierarchies, not just the one explicitly
    1470             :      * mentioned in the publication. This is required because we implicitly
    1471             :      * publish the child tables when the parent table is published.
    1472             :      */
    1473         748 :     relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
    1474             :                                             pubrel->prrelid);
    1475             : 
    1476         748 :     InvalidatePublicationRels(relids);
    1477             : 
    1478         748 :     CatalogTupleDelete(rel, &tup->t_self);
    1479             : 
    1480         748 :     ReleaseSysCache(tup);
    1481             : 
    1482         748 :     table_close(rel, RowExclusiveLock);
    1483         748 : }
    1484             : 
    1485             : /*
    1486             :  * Remove the publication by mapping OID.
    1487             :  */
    1488             : void
    1489         356 : RemovePublicationById(Oid pubid)
    1490             : {
    1491             :     Relation    rel;
    1492             :     HeapTuple   tup;
    1493             :     Form_pg_publication pubform;
    1494             : 
    1495         356 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    1496             : 
    1497         356 :     tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    1498         356 :     if (!HeapTupleIsValid(tup))
    1499           0 :         elog(ERROR, "cache lookup failed for publication %u", pubid);
    1500             : 
    1501         356 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1502             : 
    1503             :     /* Invalidate relcache so that publication info is rebuilt. */
    1504         356 :     if (pubform->puballtables)
    1505          20 :         CacheInvalidateRelcacheAll();
    1506             : 
    1507         356 :     CatalogTupleDelete(rel, &tup->t_self);
    1508             : 
    1509         356 :     ReleaseSysCache(tup);
    1510             : 
    1511         356 :     table_close(rel, RowExclusiveLock);
    1512         356 : }
    1513             : 
    1514             : /*
    1515             :  * Remove schema from publication by mapping OID.
    1516             :  */
    1517             : void
    1518         192 : RemovePublicationSchemaById(Oid psoid)
    1519             : {
    1520             :     Relation    rel;
    1521             :     HeapTuple   tup;
    1522         192 :     List       *schemaRels = NIL;
    1523             :     Form_pg_publication_namespace pubsch;
    1524             : 
    1525         192 :     rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
    1526             : 
    1527         192 :     tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
    1528             : 
    1529         192 :     if (!HeapTupleIsValid(tup))
    1530           0 :         elog(ERROR, "cache lookup failed for publication schema %u", psoid);
    1531             : 
    1532         192 :     pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
    1533             : 
    1534             :     /*
    1535             :      * Invalidate relcache so that publication info is rebuilt. See
    1536             :      * RemovePublicationRelById for why we need to consider all the
    1537             :      * partitions.
    1538             :      */
    1539         192 :     schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
    1540             :                                                PUBLICATION_PART_ALL);
    1541         192 :     InvalidatePublicationRels(schemaRels);
    1542             : 
    1543         192 :     CatalogTupleDelete(rel, &tup->t_self);
    1544             : 
    1545         192 :     ReleaseSysCache(tup);
    1546             : 
    1547         192 :     table_close(rel, RowExclusiveLock);
    1548         192 : }
    1549             : 
    1550             : /*
    1551             :  * Open relations specified by a PublicationTable list.
    1552             :  * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
    1553             :  * add them to a publication.
    1554             :  */
    1555             : static List *
    1556        1140 : OpenTableList(List *tables)
    1557             : {
    1558        1140 :     List       *relids = NIL;
    1559        1140 :     List       *rels = NIL;
    1560             :     ListCell   *lc;
    1561        1140 :     List       *relids_with_rf = NIL;
    1562        1140 :     List       *relids_with_collist = NIL;
    1563             : 
    1564             :     /*
    1565             :      * Open, share-lock, and check all the explicitly-specified relations
    1566             :      */
    1567        2338 :     foreach(lc, tables)
    1568             :     {
    1569        1222 :         PublicationTable *t = lfirst_node(PublicationTable, lc);
    1570        1222 :         bool        recurse = t->relation->inh;
    1571             :         Relation    rel;
    1572             :         Oid         myrelid;
    1573             :         PublicationRelInfo *pub_rel;
    1574             : 
    1575             :         /* Allow query cancel in case this takes a long time */
    1576        1222 :         CHECK_FOR_INTERRUPTS();
    1577             : 
    1578        1222 :         rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
    1579        1222 :         myrelid = RelationGetRelid(rel);
    1580             : 
    1581             :         /*
    1582             :          * Filter out duplicates if user specifies "foo, foo".
    1583             :          *
    1584             :          * Note that this algorithm is known to not be very efficient (O(N^2))
    1585             :          * but given that it only works on list of tables given to us by user
    1586             :          * it's deemed acceptable.
    1587             :          */
    1588        1222 :         if (list_member_oid(relids, myrelid))
    1589             :         {
    1590             :             /* Disallow duplicate tables if there are any with row filters. */
    1591          24 :             if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
    1592          12 :                 ereport(ERROR,
    1593             :                         (errcode(ERRCODE_DUPLICATE_OBJECT),
    1594             :                          errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
    1595             :                                 RelationGetRelationName(rel))));
    1596             : 
    1597             :             /* Disallow duplicate tables if there are any with column lists. */
    1598          12 :             if (t->columns || list_member_oid(relids_with_collist, myrelid))
    1599          12 :                 ereport(ERROR,
    1600             :                         (errcode(ERRCODE_DUPLICATE_OBJECT),
    1601             :                          errmsg("conflicting or redundant column lists for table \"%s\"",
    1602             :                                 RelationGetRelationName(rel))));
    1603             : 
    1604           0 :             table_close(rel, ShareUpdateExclusiveLock);
    1605           0 :             continue;
    1606             :         }
    1607             : 
    1608        1198 :         pub_rel = palloc(sizeof(PublicationRelInfo));
    1609        1198 :         pub_rel->relation = rel;
    1610        1198 :         pub_rel->whereClause = t->whereClause;
    1611        1198 :         pub_rel->columns = t->columns;
    1612        1198 :         rels = lappend(rels, pub_rel);
    1613        1198 :         relids = lappend_oid(relids, myrelid);
    1614             : 
    1615        1198 :         if (t->whereClause)
    1616         372 :             relids_with_rf = lappend_oid(relids_with_rf, myrelid);
    1617             : 
    1618        1198 :         if (t->columns)
    1619         332 :             relids_with_collist = lappend_oid(relids_with_collist, myrelid);
    1620             : 
    1621             :         /*
    1622             :          * Add children of this rel, if requested, so that they too are added
    1623             :          * to the publication.  A partitioned table can't have any inheritance
    1624             :          * children other than its partitions, which need not be explicitly
    1625             :          * added to the publication.
    1626             :          */
    1627        1198 :         if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
    1628             :         {
    1629             :             List       *children;
    1630             :             ListCell   *child;
    1631             : 
    1632        1016 :             children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
    1633             :                                            NULL);
    1634             : 
    1635        2040 :             foreach(child, children)
    1636             :             {
    1637        1024 :                 Oid         childrelid = lfirst_oid(child);
    1638             : 
    1639             :                 /* Allow query cancel in case this takes a long time */
    1640        1024 :                 CHECK_FOR_INTERRUPTS();
    1641             : 
    1642             :                 /*
    1643             :                  * Skip duplicates if user specified both parent and child
    1644             :                  * tables.
    1645             :                  */
    1646        1024 :                 if (list_member_oid(relids, childrelid))
    1647             :                 {
    1648             :                     /*
    1649             :                      * We don't allow to specify row filter for both parent
    1650             :                      * and child table at the same time as it is not very
    1651             :                      * clear which one should be given preference.
    1652             :                      */
    1653        1016 :                     if (childrelid != myrelid &&
    1654           0 :                         (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
    1655           0 :                         ereport(ERROR,
    1656             :                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    1657             :                                  errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
    1658             :                                         RelationGetRelationName(rel))));
    1659             : 
    1660             :                     /*
    1661             :                      * We don't allow to specify column list for both parent
    1662             :                      * and child table at the same time as it is not very
    1663             :                      * clear which one should be given preference.
    1664             :                      */
    1665        1016 :                     if (childrelid != myrelid &&
    1666           0 :                         (t->columns || list_member_oid(relids_with_collist, childrelid)))
    1667           0 :                         ereport(ERROR,
    1668             :                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    1669             :                                  errmsg("conflicting or redundant column lists for table \"%s\"",
    1670             :                                         RelationGetRelationName(rel))));
    1671             : 
    1672        1016 :                     continue;
    1673             :                 }
    1674             : 
    1675             :                 /* find_all_inheritors already got lock */
    1676           8 :                 rel = table_open(childrelid, NoLock);
    1677           8 :                 pub_rel = palloc(sizeof(PublicationRelInfo));
    1678           8 :                 pub_rel->relation = rel;
    1679             :                 /* child inherits WHERE clause from parent */
    1680           8 :                 pub_rel->whereClause = t->whereClause;
    1681             : 
    1682             :                 /* child inherits column list from parent */
    1683           8 :                 pub_rel->columns = t->columns;
    1684           8 :                 rels = lappend(rels, pub_rel);
    1685           8 :                 relids = lappend_oid(relids, childrelid);
    1686             : 
    1687           8 :                 if (t->whereClause)
    1688           2 :                     relids_with_rf = lappend_oid(relids_with_rf, childrelid);
    1689             : 
    1690           8 :                 if (t->columns)
    1691           0 :                     relids_with_collist = lappend_oid(relids_with_collist, childrelid);
    1692             :             }
    1693             :         }
    1694             :     }
    1695             : 
    1696        1116 :     list_free(relids);
    1697        1116 :     list_free(relids_with_rf);
    1698             : 
    1699        1116 :     return rels;
    1700             : }
    1701             : 
    1702             : /*
    1703             :  * Close all relations in the list.
    1704             :  */
    1705             : static void
    1706        1338 : CloseTableList(List *rels)
    1707             : {
    1708             :     ListCell   *lc;
    1709             : 
    1710        2712 :     foreach(lc, rels)
    1711             :     {
    1712             :         PublicationRelInfo *pub_rel;
    1713             : 
    1714        1374 :         pub_rel = (PublicationRelInfo *) lfirst(lc);
    1715        1374 :         table_close(pub_rel->relation, NoLock);
    1716             :     }
    1717             : 
    1718        1338 :     list_free_deep(rels);
    1719        1338 : }
    1720             : 
    1721             : /*
    1722             :  * Lock the schemas specified in the schema list in AccessShareLock mode in
    1723             :  * order to prevent concurrent schema deletion.
    1724             :  */
    1725             : static void
    1726         984 : LockSchemaList(List *schemalist)
    1727             : {
    1728             :     ListCell   *lc;
    1729             : 
    1730        1268 :     foreach(lc, schemalist)
    1731             :     {
    1732         284 :         Oid         schemaid = lfirst_oid(lc);
    1733             : 
    1734             :         /* Allow query cancel in case this takes a long time */
    1735         284 :         CHECK_FOR_INTERRUPTS();
    1736         284 :         LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
    1737             : 
    1738             :         /*
    1739             :          * It is possible that by the time we acquire the lock on schema,
    1740             :          * concurrent DDL has removed it. We can test this by checking the
    1741             :          * existence of schema.
    1742             :          */
    1743         284 :         if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
    1744           0 :             ereport(ERROR,
    1745             :                     errcode(ERRCODE_UNDEFINED_SCHEMA),
    1746             :                     errmsg("schema with OID %u does not exist", schemaid));
    1747             :     }
    1748         984 : }
    1749             : 
    1750             : /*
    1751             :  * Add listed tables to the publication.
    1752             :  */
    1753             : static void
    1754         934 : PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
    1755             :                      AlterPublicationStmt *stmt)
    1756             : {
    1757             :     ListCell   *lc;
    1758             : 
    1759             :     Assert(!stmt || !stmt->for_all_tables);
    1760             : 
    1761        1866 :     foreach(lc, rels)
    1762             :     {
    1763         994 :         PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
    1764         994 :         Relation    rel = pub_rel->relation;
    1765             :         ObjectAddress obj;
    1766             : 
    1767             :         /* Must be owner of the table or superuser. */
    1768         994 :         if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
    1769           6 :             aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
    1770           6 :                            RelationGetRelationName(rel));
    1771             : 
    1772         988 :         obj = publication_add_relation(pubid, pub_rel, if_not_exists);
    1773         932 :         if (stmt)
    1774             :         {
    1775         586 :             EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
    1776             :                                              (Node *) stmt);
    1777             : 
    1778         586 :             InvokeObjectPostCreateHook(PublicationRelRelationId,
    1779             :                                        obj.objectId, 0);
    1780             :         }
    1781             :     }
    1782         872 : }
    1783             : 
    1784             : /*
    1785             :  * Remove listed tables from the publication.
    1786             :  */
    1787             : static void
    1788         484 : PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
    1789             : {
    1790             :     ObjectAddress obj;
    1791             :     ListCell   *lc;
    1792             :     Oid         prid;
    1793             : 
    1794         926 :     foreach(lc, rels)
    1795             :     {
    1796         460 :         PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
    1797         460 :         Relation    rel = pubrel->relation;
    1798         460 :         Oid         relid = RelationGetRelid(rel);
    1799             : 
    1800         460 :         if (pubrel->columns)
    1801           0 :             ereport(ERROR,
    1802             :                     errcode(ERRCODE_SYNTAX_ERROR),
    1803             :                     errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
    1804             : 
    1805         460 :         prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
    1806             :                                ObjectIdGetDatum(relid),
    1807             :                                ObjectIdGetDatum(pubid));
    1808         460 :         if (!OidIsValid(prid))
    1809             :         {
    1810          12 :             if (missing_ok)
    1811           0 :                 continue;
    1812             : 
    1813          12 :             ereport(ERROR,
    1814             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    1815             :                      errmsg("relation \"%s\" is not part of the publication",
    1816             :                             RelationGetRelationName(rel))));
    1817             :         }
    1818             : 
    1819         448 :         if (pubrel->whereClause)
    1820           6 :             ereport(ERROR,
    1821             :                     (errcode(ERRCODE_SYNTAX_ERROR),
    1822             :                      errmsg("cannot use a WHERE clause when removing a table from a publication")));
    1823             : 
    1824         442 :         ObjectAddressSet(obj, PublicationRelRelationId, prid);
    1825         442 :         performDeletion(&obj, DROP_CASCADE, 0);
    1826             :     }
    1827         466 : }
    1828             : 
    1829             : /*
    1830             :  * Add listed schemas to the publication.
    1831             :  */
    1832             : static void
    1833         548 : PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
    1834             :                       AlterPublicationStmt *stmt)
    1835             : {
    1836             :     ListCell   *lc;
    1837             : 
    1838             :     Assert(!stmt || !stmt->for_all_tables);
    1839             : 
    1840         758 :     foreach(lc, schemas)
    1841             :     {
    1842         222 :         Oid         schemaid = lfirst_oid(lc);
    1843             :         ObjectAddress obj;
    1844             : 
    1845         222 :         obj = publication_add_schema(pubid, schemaid, if_not_exists);
    1846         210 :         if (stmt)
    1847             :         {
    1848          58 :             EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
    1849             :                                              (Node *) stmt);
    1850             : 
    1851          58 :             InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
    1852             :                                        obj.objectId, 0);
    1853             :         }
    1854             :     }
    1855         536 : }
    1856             : 
    1857             : /*
    1858             :  * Remove listed schemas from the publication.
    1859             :  */
    1860             : static void
    1861         430 : PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
    1862             : {
    1863             :     ObjectAddress obj;
    1864             :     ListCell   *lc;
    1865             :     Oid         psid;
    1866             : 
    1867         480 :     foreach(lc, schemas)
    1868             :     {
    1869          56 :         Oid         schemaid = lfirst_oid(lc);
    1870             : 
    1871          56 :         psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
    1872             :                                Anum_pg_publication_namespace_oid,
    1873             :                                ObjectIdGetDatum(schemaid),
    1874             :                                ObjectIdGetDatum(pubid));
    1875          56 :         if (!OidIsValid(psid))
    1876             :         {
    1877           6 :             if (missing_ok)
    1878           0 :                 continue;
    1879             : 
    1880           6 :             ereport(ERROR,
    1881             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    1882             :                      errmsg("tables from schema \"%s\" are not part of the publication",
    1883             :                             get_namespace_name(schemaid))));
    1884             :         }
    1885             : 
    1886          50 :         ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
    1887          50 :         performDeletion(&obj, DROP_CASCADE, 0);
    1888             :     }
    1889         424 : }
    1890             : 
    1891             : /*
    1892             :  * Internal workhorse for changing a publication owner
    1893             :  */
    1894             : static void
    1895          24 : AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
    1896             : {
    1897             :     Form_pg_publication form;
    1898             : 
    1899          24 :     form = (Form_pg_publication) GETSTRUCT(tup);
    1900             : 
    1901          24 :     if (form->pubowner == newOwnerId)
    1902           0 :         return;
    1903             : 
    1904          24 :     if (!superuser())
    1905             :     {
    1906             :         AclResult   aclresult;
    1907             : 
    1908             :         /* Must be owner */
    1909          12 :         if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
    1910           0 :             aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
    1911           0 :                            NameStr(form->pubname));
    1912             : 
    1913             :         /* Must be able to become new owner */
    1914          12 :         check_can_set_role(GetUserId(), newOwnerId);
    1915             : 
    1916             :         /* New owner must have CREATE privilege on database */
    1917          12 :         aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
    1918          12 :         if (aclresult != ACLCHECK_OK)
    1919           0 :             aclcheck_error(aclresult, OBJECT_DATABASE,
    1920           0 :                            get_database_name(MyDatabaseId));
    1921             : 
    1922          12 :         if (form->puballtables && !superuser_arg(newOwnerId))
    1923           0 :             ereport(ERROR,
    1924             :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1925             :                      errmsg("permission denied to change owner of publication \"%s\"",
    1926             :                             NameStr(form->pubname)),
    1927             :                      errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
    1928             : 
    1929          12 :         if (!superuser_arg(newOwnerId) && is_schema_publication(form->oid))
    1930           6 :             ereport(ERROR,
    1931             :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1932             :                      errmsg("permission denied to change owner of publication \"%s\"",
    1933             :                             NameStr(form->pubname)),
    1934             :                      errhint("The owner of a FOR TABLES IN SCHEMA publication must be a superuser.")));
    1935             :     }
    1936             : 
    1937          18 :     form->pubowner = newOwnerId;
    1938          18 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    1939             : 
    1940             :     /* Update owner dependency reference */
    1941          18 :     changeDependencyOnOwner(PublicationRelationId,
    1942             :                             form->oid,
    1943             :                             newOwnerId);
    1944             : 
    1945          18 :     InvokeObjectPostAlterHook(PublicationRelationId,
    1946             :                               form->oid, 0);
    1947             : }
    1948             : 
    1949             : /*
    1950             :  * Change publication owner -- by name
    1951             :  */
    1952             : ObjectAddress
    1953          24 : AlterPublicationOwner(const char *name, Oid newOwnerId)
    1954             : {
    1955             :     Oid         subid;
    1956             :     HeapTuple   tup;
    1957             :     Relation    rel;
    1958             :     ObjectAddress address;
    1959             :     Form_pg_publication pubform;
    1960             : 
    1961          24 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    1962             : 
    1963          24 :     tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
    1964             : 
    1965          24 :     if (!HeapTupleIsValid(tup))
    1966           0 :         ereport(ERROR,
    1967             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    1968             :                  errmsg("publication \"%s\" does not exist", name)));
    1969             : 
    1970          24 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1971          24 :     subid = pubform->oid;
    1972             : 
    1973          24 :     AlterPublicationOwner_internal(rel, tup, newOwnerId);
    1974             : 
    1975          18 :     ObjectAddressSet(address, PublicationRelationId, subid);
    1976             : 
    1977          18 :     heap_freetuple(tup);
    1978             : 
    1979          18 :     table_close(rel, RowExclusiveLock);
    1980             : 
    1981          18 :     return address;
    1982             : }
    1983             : 
    1984             : /*
    1985             :  * Change publication owner -- by OID
    1986             :  */
    1987             : void
    1988           0 : AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
    1989             : {
    1990             :     HeapTuple   tup;
    1991             :     Relation    rel;
    1992             : 
    1993           0 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    1994             : 
    1995           0 :     tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(subid));
    1996             : 
    1997           0 :     if (!HeapTupleIsValid(tup))
    1998           0 :         ereport(ERROR,
    1999             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2000             :                  errmsg("publication with OID %u does not exist", subid)));
    2001             : 
    2002           0 :     AlterPublicationOwner_internal(rel, tup, newOwnerId);
    2003             : 
    2004           0 :     heap_freetuple(tup);
    2005             : 
    2006           0 :     table_close(rel, RowExclusiveLock);
    2007           0 : }

Generated by: LCOV version 1.14