LCOV - code coverage report
Current view: top level - src/backend/commands - publicationcmds.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 601 648 92.7 %
Date: 2024-04-13 09:11:47 Functions: 29 30 96.7 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * publicationcmds.c
       4             :  *      publication manipulation
       5             :  *
       6             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *      src/backend/commands/publicationcmds.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/htup_details.h"
      18             : #include "access/table.h"
      19             : #include "access/xact.h"
      20             : #include "catalog/catalog.h"
      21             : #include "catalog/indexing.h"
      22             : #include "catalog/namespace.h"
      23             : #include "catalog/objectaccess.h"
      24             : #include "catalog/objectaddress.h"
      25             : #include "catalog/pg_database.h"
      26             : #include "catalog/pg_inherits.h"
      27             : #include "catalog/pg_namespace.h"
      28             : #include "catalog/pg_proc.h"
      29             : #include "catalog/pg_publication.h"
      30             : #include "catalog/pg_publication_namespace.h"
      31             : #include "catalog/pg_publication_rel.h"
      32             : #include "commands/dbcommands.h"
      33             : #include "commands/defrem.h"
      34             : #include "commands/event_trigger.h"
      35             : #include "commands/publicationcmds.h"
      36             : #include "miscadmin.h"
      37             : #include "nodes/nodeFuncs.h"
      38             : #include "parser/parse_clause.h"
      39             : #include "parser/parse_collate.h"
      40             : #include "parser/parse_relation.h"
      41             : #include "storage/lmgr.h"
      42             : #include "utils/acl.h"
      43             : #include "utils/builtins.h"
      44             : #include "utils/inval.h"
      45             : #include "utils/lsyscache.h"
      46             : #include "utils/rel.h"
      47             : #include "utils/syscache.h"
      48             : #include "utils/varlena.h"
      49             : 
      50             : 
      51             : /*
      52             :  * Information used to validate the columns in the row filter expression. See
      53             :  * contain_invalid_rfcolumn_walker for details.
      54             :  */
      55             : typedef struct rf_context
      56             : {
      57             :     Bitmapset  *bms_replident;  /* bitset of replica identity columns */
      58             :     bool        pubviaroot;     /* true if we are validating the parent
      59             :                                  * relation's row filter */
      60             :     Oid         relid;          /* relid of the relation */
      61             :     Oid         parentid;       /* relid of the parent relation */
      62             : } rf_context;
      63             : 
      64             : static List *OpenTableList(List *tables);
      65             : static void CloseTableList(List *rels);
      66             : static void LockSchemaList(List *schemalist);
      67             : static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
      68             :                                  AlterPublicationStmt *stmt);
      69             : static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
      70             : static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
      71             :                                   AlterPublicationStmt *stmt);
      72             : static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
      73             : 
      74             : 
      75             : static void
      76         782 : parse_publication_options(ParseState *pstate,
      77             :                           List *options,
      78             :                           bool *publish_given,
      79             :                           PublicationActions *pubactions,
      80             :                           bool *publish_via_partition_root_given,
      81             :                           bool *publish_via_partition_root)
      82             : {
      83             :     ListCell   *lc;
      84             : 
      85         782 :     *publish_given = false;
      86         782 :     *publish_via_partition_root_given = false;
      87             : 
      88             :     /* defaults */
      89         782 :     pubactions->pubinsert = true;
      90         782 :     pubactions->pubupdate = true;
      91         782 :     pubactions->pubdelete = true;
      92         782 :     pubactions->pubtruncate = true;
      93         782 :     *publish_via_partition_root = false;
      94             : 
      95             :     /* Parse options */
      96        1054 :     foreach(lc, options)
      97             :     {
      98         290 :         DefElem    *defel = (DefElem *) lfirst(lc);
      99             : 
     100         290 :         if (strcmp(defel->defname, "publish") == 0)
     101             :         {
     102             :             char       *publish;
     103             :             List       *publish_list;
     104             :             ListCell   *lc2;
     105             : 
     106         122 :             if (*publish_given)
     107           0 :                 errorConflictingDefElem(defel, pstate);
     108             : 
     109             :             /*
     110             :              * If publish option was given only the explicitly listed actions
     111             :              * should be published.
     112             :              */
     113         122 :             pubactions->pubinsert = false;
     114         122 :             pubactions->pubupdate = false;
     115         122 :             pubactions->pubdelete = false;
     116         122 :             pubactions->pubtruncate = false;
     117             : 
     118         122 :             *publish_given = true;
     119         122 :             publish = defGetString(defel);
     120             : 
     121         122 :             if (!SplitIdentifierString(publish, ',', &publish_list))
     122           0 :                 ereport(ERROR,
     123             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     124             :                          errmsg("invalid list syntax in parameter \"%s\"",
     125             :                                 "publish")));
     126             : 
     127             :             /* Process the option list. */
     128         270 :             foreach(lc2, publish_list)
     129             :             {
     130         154 :                 char       *publish_opt = (char *) lfirst(lc2);
     131             : 
     132         154 :                 if (strcmp(publish_opt, "insert") == 0)
     133         108 :                     pubactions->pubinsert = true;
     134          46 :                 else if (strcmp(publish_opt, "update") == 0)
     135          20 :                     pubactions->pubupdate = true;
     136          26 :                 else if (strcmp(publish_opt, "delete") == 0)
     137          10 :                     pubactions->pubdelete = true;
     138          16 :                 else if (strcmp(publish_opt, "truncate") == 0)
     139          10 :                     pubactions->pubtruncate = true;
     140             :                 else
     141           6 :                     ereport(ERROR,
     142             :                             (errcode(ERRCODE_SYNTAX_ERROR),
     143             :                              errmsg("unrecognized value for publication option \"%s\": \"%s\"",
     144             :                                     "publish", publish_opt)));
     145             :             }
     146             :         }
     147         168 :         else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
     148             :         {
     149         162 :             if (*publish_via_partition_root_given)
     150           6 :                 errorConflictingDefElem(defel, pstate);
     151         156 :             *publish_via_partition_root_given = true;
     152         156 :             *publish_via_partition_root = defGetBoolean(defel);
     153             :         }
     154             :         else
     155           6 :             ereport(ERROR,
     156             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     157             :                      errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
     158             :     }
     159         764 : }
     160             : 
     161             : /*
     162             :  * Convert the PublicationObjSpecType list into schema oid list and
     163             :  * PublicationTable list.
     164             :  */
     165             : static void
     166        1500 : ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
     167             :                            List **rels, List **schemas)
     168             : {
     169             :     ListCell   *cell;
     170             :     PublicationObjSpec *pubobj;
     171             : 
     172        1500 :     if (!pubobjspec_list)
     173          84 :         return;
     174             : 
     175        2996 :     foreach(cell, pubobjspec_list)
     176             :     {
     177             :         Oid         schemaid;
     178             :         List       *search_path;
     179             : 
     180        1616 :         pubobj = (PublicationObjSpec *) lfirst(cell);
     181             : 
     182        1616 :         switch (pubobj->pubobjtype)
     183             :         {
     184        1260 :             case PUBLICATIONOBJ_TABLE:
     185        1260 :                 *rels = lappend(*rels, pubobj->pubtable);
     186        1260 :                 break;
     187         332 :             case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
     188         332 :                 schemaid = get_namespace_oid(pubobj->name, false);
     189             : 
     190             :                 /* Filter out duplicates if user specifies "sch1, sch1" */
     191         302 :                 *schemas = list_append_unique_oid(*schemas, schemaid);
     192         302 :                 break;
     193          24 :             case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
     194          24 :                 search_path = fetch_search_path(false);
     195          24 :                 if (search_path == NIL) /* nothing valid in search_path? */
     196           6 :                     ereport(ERROR,
     197             :                             errcode(ERRCODE_UNDEFINED_SCHEMA),
     198             :                             errmsg("no schema has been selected for CURRENT_SCHEMA"));
     199             : 
     200          18 :                 schemaid = linitial_oid(search_path);
     201          18 :                 list_free(search_path);
     202             : 
     203             :                 /* Filter out duplicates if user specifies "sch1, sch1" */
     204          18 :                 *schemas = list_append_unique_oid(*schemas, schemaid);
     205          18 :                 break;
     206           0 :             default:
     207             :                 /* shouldn't happen */
     208           0 :                 elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
     209             :                 break;
     210             :         }
     211             :     }
     212             : }
     213             : 
     214             : /*
     215             :  * Returns true if any of the columns used in the row filter WHERE expression is
     216             :  * not part of REPLICA IDENTITY, false otherwise.
     217             :  */
     218             : static bool
     219         258 : contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
     220             : {
     221         258 :     if (node == NULL)
     222           0 :         return false;
     223             : 
     224         258 :     if (IsA(node, Var))
     225             :     {
     226         104 :         Var        *var = (Var *) node;
     227         104 :         AttrNumber  attnum = var->varattno;
     228             : 
     229             :         /*
     230             :          * If pubviaroot is true, we are validating the row filter of the
     231             :          * parent table, but the bitmap contains the replica identity
     232             :          * information of the child table. So, get the column number of the
     233             :          * child table as parent and child column order could be different.
     234             :          */
     235         104 :         if (context->pubviaroot)
     236             :         {
     237          16 :             char       *colname = get_attname(context->parentid, attnum, false);
     238             : 
     239          16 :             attnum = get_attnum(context->relid, colname);
     240             :         }
     241             : 
     242         104 :         if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
     243         104 :                            context->bms_replident))
     244          60 :             return true;
     245             :     }
     246             : 
     247         198 :     return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
     248             :                                   (void *) context);
     249             : }
     250             : 
     251             : /*
     252             :  * Check if all columns referenced in the filter expression are part of the
     253             :  * REPLICA IDENTITY index or not.
     254             :  *
     255             :  * Returns true if any invalid column is found.
     256             :  */
     257             : bool
     258         662 : pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
     259             :                                bool pubviaroot)
     260             : {
     261             :     HeapTuple   rftuple;
     262         662 :     Oid         relid = RelationGetRelid(relation);
     263         662 :     Oid         publish_as_relid = RelationGetRelid(relation);
     264         662 :     bool        result = false;
     265             :     Datum       rfdatum;
     266             :     bool        rfisnull;
     267             : 
     268             :     /*
     269             :      * FULL means all columns are in the REPLICA IDENTITY, so all columns are
     270             :      * allowed in the row filter and we can skip the validation.
     271             :      */
     272         662 :     if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     273         156 :         return false;
     274             : 
     275             :     /*
     276             :      * For a partition, if pubviaroot is true, find the topmost ancestor that
     277             :      * is published via this publication as we need to use its row filter
     278             :      * expression to filter the partition's changes.
     279             :      *
     280             :      * Note that even though the row filter used is for an ancestor, the
     281             :      * REPLICA IDENTITY used will be for the actual child table.
     282             :      */
     283         506 :     if (pubviaroot && relation->rd_rel->relispartition)
     284             :     {
     285             :         publish_as_relid
     286         106 :             = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
     287             : 
     288         106 :         if (!OidIsValid(publish_as_relid))
     289           6 :             publish_as_relid = relid;
     290             :     }
     291             : 
     292         506 :     rftuple = SearchSysCache2(PUBLICATIONRELMAP,
     293             :                               ObjectIdGetDatum(publish_as_relid),
     294             :                               ObjectIdGetDatum(pubid));
     295             : 
     296         506 :     if (!HeapTupleIsValid(rftuple))
     297          56 :         return false;
     298             : 
     299         450 :     rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
     300             :                               Anum_pg_publication_rel_prqual,
     301             :                               &rfisnull);
     302             : 
     303         450 :     if (!rfisnull)
     304             :     {
     305         102 :         rf_context  context = {0};
     306             :         Node       *rfnode;
     307         102 :         Bitmapset  *bms = NULL;
     308             : 
     309         102 :         context.pubviaroot = pubviaroot;
     310         102 :         context.parentid = publish_as_relid;
     311         102 :         context.relid = relid;
     312             : 
     313             :         /* Remember columns that are part of the REPLICA IDENTITY */
     314         102 :         bms = RelationGetIndexAttrBitmap(relation,
     315             :                                          INDEX_ATTR_BITMAP_IDENTITY_KEY);
     316             : 
     317         102 :         context.bms_replident = bms;
     318         102 :         rfnode = stringToNode(TextDatumGetCString(rfdatum));
     319         102 :         result = contain_invalid_rfcolumn_walker(rfnode, &context);
     320             :     }
     321             : 
     322         450 :     ReleaseSysCache(rftuple);
     323             : 
     324         450 :     return result;
     325             : }
     326             : 
     327             : /*
     328             :  * Check if all columns referenced in the REPLICA IDENTITY are covered by
     329             :  * the column list.
     330             :  *
     331             :  * Returns true if any replica identity column is not covered by column list.
     332             :  */
     333             : bool
     334         662 : pub_collist_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
     335             :                                     bool pubviaroot)
     336             : {
     337             :     HeapTuple   tuple;
     338         662 :     Oid         relid = RelationGetRelid(relation);
     339         662 :     Oid         publish_as_relid = RelationGetRelid(relation);
     340         662 :     bool        result = false;
     341             :     Datum       datum;
     342             :     bool        isnull;
     343             : 
     344             :     /*
     345             :      * For a partition, if pubviaroot is true, find the topmost ancestor that
     346             :      * is published via this publication as we need to use its column list for
     347             :      * the changes.
     348             :      *
     349             :      * Note that even though the column list used is for an ancestor, the
     350             :      * REPLICA IDENTITY used will be for the actual child table.
     351             :      */
     352         662 :     if (pubviaroot && relation->rd_rel->relispartition)
     353             :     {
     354         128 :         publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
     355             : 
     356         128 :         if (!OidIsValid(publish_as_relid))
     357           6 :             publish_as_relid = relid;
     358             :     }
     359             : 
     360         662 :     tuple = SearchSysCache2(PUBLICATIONRELMAP,
     361             :                             ObjectIdGetDatum(publish_as_relid),
     362             :                             ObjectIdGetDatum(pubid));
     363             : 
     364         662 :     if (!HeapTupleIsValid(tuple))
     365          64 :         return false;
     366             : 
     367         598 :     datum = SysCacheGetAttr(PUBLICATIONRELMAP, tuple,
     368             :                             Anum_pg_publication_rel_prattrs,
     369             :                             &isnull);
     370             : 
     371         598 :     if (!isnull)
     372             :     {
     373             :         int         x;
     374             :         Bitmapset  *idattrs;
     375         230 :         Bitmapset  *columns = NULL;
     376             : 
     377             :         /* With REPLICA IDENTITY FULL, no column list is allowed. */
     378         230 :         if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     379          36 :             result = true;
     380             : 
     381             :         /* Transform the column list datum to a bitmapset. */
     382         230 :         columns = pub_collist_to_bitmapset(NULL, datum, NULL);
     383             : 
     384             :         /* Remember columns that are part of the REPLICA IDENTITY */
     385         230 :         idattrs = RelationGetIndexAttrBitmap(relation,
     386             :                                              INDEX_ATTR_BITMAP_IDENTITY_KEY);
     387             : 
     388             :         /*
     389             :          * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are
     390             :          * offset (to handle system columns the usual way), while column list
     391             :          * does not use offset, so we can't do bms_is_subset(). Instead, we
     392             :          * have to loop over the idattrs and check all of them are in the
     393             :          * list.
     394             :          */
     395         230 :         x = -1;
     396         350 :         while ((x = bms_next_member(idattrs, x)) >= 0)
     397             :         {
     398         192 :             AttrNumber  attnum = (x + FirstLowInvalidHeapAttributeNumber);
     399             : 
     400             :             /*
     401             :              * If pubviaroot is true, we are validating the column list of the
     402             :              * parent table, but the bitmap contains the replica identity
     403             :              * information of the child table. The parent/child attnums may
     404             :              * not match, so translate them to the parent - get the attname
     405             :              * from the child, and look it up in the parent.
     406             :              */
     407         192 :             if (pubviaroot)
     408             :             {
     409             :                 /* attribute name in the child table */
     410          80 :                 char       *colname = get_attname(relid, attnum, false);
     411             : 
     412             :                 /*
     413             :                  * Determine the attnum for the attribute name in parent (we
     414             :                  * are using the column list defined on the parent).
     415             :                  */
     416          80 :                 attnum = get_attnum(publish_as_relid, colname);
     417             :             }
     418             : 
     419             :             /* replica identity column, not covered by the column list */
     420         192 :             if (!bms_is_member(attnum, columns))
     421             :             {
     422          72 :                 result = true;
     423          72 :                 break;
     424             :             }
     425             :         }
     426             : 
     427         230 :         bms_free(idattrs);
     428         230 :         bms_free(columns);
     429             :     }
     430             : 
     431         598 :     ReleaseSysCache(tuple);
     432             : 
     433         598 :     return result;
     434             : }
     435             : 
     436             : /* check_functions_in_node callback */
     437             : static bool
     438         396 : contain_mutable_or_user_functions_checker(Oid func_id, void *context)
     439             : {
     440         396 :     return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
     441             :             func_id >= FirstNormalObjectId);
     442             : }
     443             : 
     444             : /*
     445             :  * The row filter walker checks if the row filter expression is a "simple
     446             :  * expression".
     447             :  *
     448             :  * It allows only simple or compound expressions such as:
     449             :  * - (Var Op Const)
     450             :  * - (Var Op Var)
     451             :  * - (Var Op Const) AND/OR (Var Op Const)
     452             :  * - etc
     453             :  * (where Var is a column of the table this filter belongs to)
     454             :  *
     455             :  * The simple expression has the following restrictions:
     456             :  * - User-defined operators are not allowed;
     457             :  * - User-defined functions are not allowed;
     458             :  * - User-defined types are not allowed;
     459             :  * - User-defined collations are not allowed;
     460             :  * - Non-immutable built-in functions are not allowed;
     461             :  * - System columns are not allowed.
     462             :  *
     463             :  * NOTES
     464             :  *
     465             :  * We don't allow user-defined functions/operators/types/collations because
     466             :  * (a) if a user drops a user-defined object used in a row filter expression or
     467             :  * if there is any other error while using it, the logical decoding
     468             :  * infrastructure won't be able to recover from such an error even if the
     469             :  * object is recreated again because a historic snapshot is used to evaluate
     470             :  * the row filter;
     471             :  * (b) a user-defined function can be used to access tables that could have
     472             :  * unpleasant results because a historic snapshot is used. That's why only
     473             :  * immutable built-in functions are allowed in row filter expressions.
     474             :  *
     475             :  * We don't allow system columns because currently, we don't have that
     476             :  * information in the tuple passed to downstream. Also, as we don't replicate
     477             :  * those to subscribers, there doesn't seem to be a need for a filter on those
     478             :  * columns.
     479             :  *
     480             :  * We can allow other node types after more analysis and testing.
     481             :  */
     482             : static bool
     483        1328 : check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
     484             : {
     485        1328 :     char       *errdetail_msg = NULL;
     486             : 
     487        1328 :     if (node == NULL)
     488           6 :         return false;
     489             : 
     490        1322 :     switch (nodeTag(node))
     491             :     {
     492         356 :         case T_Var:
     493             :             /* System columns are not allowed. */
     494         356 :             if (((Var *) node)->varattno < InvalidAttrNumber)
     495           6 :                 errdetail_msg = _("System columns are not allowed.");
     496         356 :             break;
     497         352 :         case T_OpExpr:
     498             :         case T_DistinctExpr:
     499             :         case T_NullIfExpr:
     500             :             /* OK, except user-defined operators are not allowed. */
     501         352 :             if (((OpExpr *) node)->opno >= FirstNormalObjectId)
     502           6 :                 errdetail_msg = _("User-defined operators are not allowed.");
     503         352 :             break;
     504           6 :         case T_ScalarArrayOpExpr:
     505             :             /* OK, except user-defined operators are not allowed. */
     506           6 :             if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
     507           0 :                 errdetail_msg = _("User-defined operators are not allowed.");
     508             : 
     509             :             /*
     510             :              * We don't need to check the hashfuncid and negfuncid of
     511             :              * ScalarArrayOpExpr as those functions are only built for a
     512             :              * subquery.
     513             :              */
     514           6 :             break;
     515           6 :         case T_RowCompareExpr:
     516             :             {
     517             :                 ListCell   *opid;
     518             : 
     519             :                 /* OK, except user-defined operators are not allowed. */
     520          18 :                 foreach(opid, ((RowCompareExpr *) node)->opnos)
     521             :                 {
     522          12 :                     if (lfirst_oid(opid) >= FirstNormalObjectId)
     523             :                     {
     524           0 :                         errdetail_msg = _("User-defined operators are not allowed.");
     525           0 :                         break;
     526             :                     }
     527             :                 }
     528             :             }
     529           6 :             break;
     530         596 :         case T_Const:
     531             :         case T_FuncExpr:
     532             :         case T_BoolExpr:
     533             :         case T_RelabelType:
     534             :         case T_CollateExpr:
     535             :         case T_CaseExpr:
     536             :         case T_CaseTestExpr:
     537             :         case T_ArrayExpr:
     538             :         case T_RowExpr:
     539             :         case T_CoalesceExpr:
     540             :         case T_MinMaxExpr:
     541             :         case T_XmlExpr:
     542             :         case T_NullTest:
     543             :         case T_BooleanTest:
     544             :         case T_List:
     545             :             /* OK, supported */
     546         596 :             break;
     547           6 :         default:
     548           6 :             errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
     549           6 :             break;
     550             :     }
     551             : 
     552             :     /*
     553             :      * For all the supported nodes, if we haven't already found a problem,
     554             :      * check the types, functions, and collations used in it.  We check List
     555             :      * by walking through each element.
     556             :      */
     557        1322 :     if (!errdetail_msg && !IsA(node, List))
     558             :     {
     559        1250 :         if (exprType(node) >= FirstNormalObjectId)
     560           6 :             errdetail_msg = _("User-defined types are not allowed.");
     561        1244 :         else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
     562             :                                          (void *) pstate))
     563          12 :             errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
     564        2464 :         else if (exprCollation(node) >= FirstNormalObjectId ||
     565        1232 :                  exprInputCollation(node) >= FirstNormalObjectId)
     566           6 :             errdetail_msg = _("User-defined collations are not allowed.");
     567             :     }
     568             : 
     569             :     /*
     570             :      * If we found a problem in this node, throw error now. Otherwise keep
     571             :      * going.
     572             :      */
     573        1322 :     if (errdetail_msg)
     574          42 :         ereport(ERROR,
     575             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     576             :                  errmsg("invalid publication WHERE expression"),
     577             :                  errdetail_internal("%s", errdetail_msg),
     578             :                  parser_errposition(pstate, exprLocation(node))));
     579             : 
     580        1280 :     return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
     581             :                                   (void *) pstate);
     582             : }
     583             : 
     584             : /*
     585             :  * Check if the row filter expression is a "simple expression".
     586             :  *
     587             :  * See check_simple_rowfilter_expr_walker for details.
     588             :  */
     589             : static bool
     590         344 : check_simple_rowfilter_expr(Node *node, ParseState *pstate)
     591             : {
     592         344 :     return check_simple_rowfilter_expr_walker(node, pstate);
     593             : }
     594             : 
     595             : /*
     596             :  * Transform the publication WHERE expression for all the relations in the list,
     597             :  * ensuring it is coerced to boolean and necessary collation information is
     598             :  * added if required, and add a new nsitem/RTE for the associated relation to
     599             :  * the ParseState's namespace list.
     600             :  *
     601             :  * Also check the publication row filter expression and throw an error if
     602             :  * anything not permitted or unexpected is encountered.
     603             :  */
     604             : static void
     605        1042 : TransformPubWhereClauses(List *tables, const char *queryString,
     606             :                          bool pubviaroot)
     607             : {
     608             :     ListCell   *lc;
     609             : 
     610        2086 :     foreach(lc, tables)
     611             :     {
     612             :         ParseNamespaceItem *nsitem;
     613        1104 :         Node       *whereclause = NULL;
     614             :         ParseState *pstate;
     615        1104 :         PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
     616             : 
     617        1104 :         if (pri->whereClause == NULL)
     618         742 :             continue;
     619             : 
     620             :         /*
     621             :          * If the publication doesn't publish changes via the root partitioned
     622             :          * table, the partition's row filter will be used. So disallow using
     623             :          * WHERE clause on partitioned table in this case.
     624             :          */
     625         362 :         if (!pubviaroot &&
     626         340 :             pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     627           6 :             ereport(ERROR,
     628             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     629             :                      errmsg("cannot use publication WHERE clause for relation \"%s\"",
     630             :                             RelationGetRelationName(pri->relation)),
     631             :                      errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
     632             :                                "publish_via_partition_root")));
     633             : 
     634             :         /*
     635             :          * A fresh pstate is required so that we only have "this" table in its
     636             :          * rangetable
     637             :          */
     638         356 :         pstate = make_parsestate(NULL);
     639         356 :         pstate->p_sourcetext = queryString;
     640         356 :         nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
     641             :                                                AccessShareLock, NULL,
     642             :                                                false, false);
     643         356 :         addNSItemToQuery(pstate, nsitem, false, true, true);
     644             : 
     645         356 :         whereclause = transformWhereClause(pstate,
     646         356 :                                            copyObject(pri->whereClause),
     647             :                                            EXPR_KIND_WHERE,
     648             :                                            "PUBLICATION WHERE");
     649             : 
     650             :         /* Fix up collation information */
     651         344 :         assign_expr_collations(pstate, whereclause);
     652             : 
     653             :         /*
     654             :          * We allow only simple expressions in row filters. See
     655             :          * check_simple_rowfilter_expr_walker.
     656             :          */
     657         344 :         check_simple_rowfilter_expr(whereclause, pstate);
     658             : 
     659         302 :         free_parsestate(pstate);
     660             : 
     661         302 :         pri->whereClause = whereclause;
     662             :     }
     663         982 : }
     664             : 
     665             : 
     666             : /*
     667             :  * Given a list of tables that are going to be added to a publication,
     668             :  * verify that they fulfill the necessary preconditions, namely: no tables
     669             :  * have a column list if any schema is published; and partitioned tables do
     670             :  * not have column lists if publish_via_partition_root is not set.
     671             :  *
     672             :  * 'publish_schema' indicates that the publication contains any TABLES IN
     673             :  * SCHEMA elements (newly added in this command, or preexisting).
     674             :  * 'pubviaroot' is the value of publish_via_partition_root.
     675             :  */
     676             : static void
     677         982 : CheckPubRelationColumnList(char *pubname, List *tables,
     678             :                            bool publish_schema, bool pubviaroot)
     679             : {
     680             :     ListCell   *lc;
     681             : 
     682        1996 :     foreach(lc, tables)
     683             :     {
     684        1044 :         PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
     685             : 
     686        1044 :         if (pri->columns == NIL)
     687         718 :             continue;
     688             : 
     689             :         /*
     690             :          * Disallow specifying column list if any schema is in the
     691             :          * publication.
     692             :          *
     693             :          * XXX We could instead just forbid the case when the publication
     694             :          * tries to publish the table with a column list and a schema for that
     695             :          * table. However, if we do that then we need a restriction during
     696             :          * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
     697             :          * seem to be a good idea.
     698             :          */
     699         326 :         if (publish_schema)
     700          24 :             ereport(ERROR,
     701             :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     702             :                     errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
     703             :                            get_namespace_name(RelationGetNamespace(pri->relation)),
     704             :                            RelationGetRelationName(pri->relation), pubname),
     705             :                     errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
     706             : 
     707             :         /*
     708             :          * If the publication doesn't publish changes via the root partitioned
     709             :          * table, the partition's column list will be used. So disallow using
     710             :          * a column list on the partitioned table in this case.
     711             :          */
     712         302 :         if (!pubviaroot &&
     713         226 :             pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     714           6 :             ereport(ERROR,
     715             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     716             :                      errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
     717             :                             get_namespace_name(RelationGetNamespace(pri->relation)),
     718             :                             RelationGetRelationName(pri->relation), pubname),
     719             :                      errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
     720             :                                "publish_via_partition_root")));
     721             :     }
     722         952 : }
     723             : 
     724             : /*
     725             :  * Create new publication.
     726             :  */
     727             : ObjectAddress
     728         684 : CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
     729             : {
     730             :     Relation    rel;
     731             :     ObjectAddress myself;
     732             :     Oid         puboid;
     733             :     bool        nulls[Natts_pg_publication];
     734             :     Datum       values[Natts_pg_publication];
     735             :     HeapTuple   tup;
     736             :     bool        publish_given;
     737             :     PublicationActions pubactions;
     738             :     bool        publish_via_partition_root_given;
     739             :     bool        publish_via_partition_root;
     740             :     AclResult   aclresult;
     741         684 :     List       *relations = NIL;
     742         684 :     List       *schemaidlist = NIL;
     743             : 
     744             :     /* must have CREATE privilege on database */
     745         684 :     aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
     746         684 :     if (aclresult != ACLCHECK_OK)
     747           6 :         aclcheck_error(aclresult, OBJECT_DATABASE,
     748           6 :                        get_database_name(MyDatabaseId));
     749             : 
     750             :     /* FOR ALL TABLES requires superuser */
     751         678 :     if (stmt->for_all_tables && !superuser())
     752           0 :         ereport(ERROR,
     753             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     754             :                  errmsg("must be superuser to create FOR ALL TABLES publication")));
     755             : 
     756         678 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
     757             : 
     758             :     /* Check if name is used */
     759         678 :     puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
     760             :                              CStringGetDatum(stmt->pubname));
     761         678 :     if (OidIsValid(puboid))
     762           6 :         ereport(ERROR,
     763             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     764             :                  errmsg("publication \"%s\" already exists",
     765             :                         stmt->pubname)));
     766             : 
     767             :     /* Form a tuple. */
     768         672 :     memset(values, 0, sizeof(values));
     769         672 :     memset(nulls, false, sizeof(nulls));
     770             : 
     771         672 :     values[Anum_pg_publication_pubname - 1] =
     772         672 :         DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
     773         672 :     values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
     774             : 
     775         672 :     parse_publication_options(pstate,
     776             :                               stmt->options,
     777             :                               &publish_given, &pubactions,
     778             :                               &publish_via_partition_root_given,
     779             :                               &publish_via_partition_root);
     780             : 
     781         654 :     puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
     782             :                                 Anum_pg_publication_oid);
     783         654 :     values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
     784         654 :     values[Anum_pg_publication_puballtables - 1] =
     785         654 :         BoolGetDatum(stmt->for_all_tables);
     786         654 :     values[Anum_pg_publication_pubinsert - 1] =
     787         654 :         BoolGetDatum(pubactions.pubinsert);
     788         654 :     values[Anum_pg_publication_pubupdate - 1] =
     789         654 :         BoolGetDatum(pubactions.pubupdate);
     790         654 :     values[Anum_pg_publication_pubdelete - 1] =
     791         654 :         BoolGetDatum(pubactions.pubdelete);
     792         654 :     values[Anum_pg_publication_pubtruncate - 1] =
     793         654 :         BoolGetDatum(pubactions.pubtruncate);
     794         654 :     values[Anum_pg_publication_pubviaroot - 1] =
     795         654 :         BoolGetDatum(publish_via_partition_root);
     796             : 
     797         654 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     798             : 
     799             :     /* Insert tuple into catalog. */
     800         654 :     CatalogTupleInsert(rel, tup);
     801         654 :     heap_freetuple(tup);
     802             : 
     803         654 :     recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
     804             : 
     805         654 :     ObjectAddressSet(myself, PublicationRelationId, puboid);
     806             : 
     807             :     /* Make the changes visible. */
     808         654 :     CommandCounterIncrement();
     809             : 
     810             :     /* Associate objects with the publication. */
     811         654 :     if (stmt->for_all_tables)
     812             :     {
     813             :         /* Invalidate relcache so that publication info is rebuilt. */
     814          62 :         CacheInvalidateRelcacheAll();
     815             :     }
     816             :     else
     817             :     {
     818         592 :         ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
     819             :                                    &schemaidlist);
     820             : 
     821             :         /* FOR TABLES IN SCHEMA requires superuser */
     822         574 :         if (schemaidlist != NIL && !superuser())
     823           6 :             ereport(ERROR,
     824             :                     errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     825             :                     errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
     826             : 
     827         568 :         if (relations != NIL)
     828             :         {
     829             :             List       *rels;
     830             : 
     831         376 :             rels = OpenTableList(relations);
     832         352 :             TransformPubWhereClauses(rels, pstate->p_sourcetext,
     833             :                                      publish_via_partition_root);
     834             : 
     835         328 :             CheckPubRelationColumnList(stmt->pubname, rels,
     836             :                                        schemaidlist != NIL,
     837             :                                        publish_via_partition_root);
     838             : 
     839         322 :             PublicationAddTables(puboid, rels, true, NULL);
     840         296 :             CloseTableList(rels);
     841             :         }
     842             : 
     843         488 :         if (schemaidlist != NIL)
     844             :         {
     845             :             /*
     846             :              * Schema lock is held until the publication is created to prevent
     847             :              * concurrent schema deletion.
     848             :              */
     849         134 :             LockSchemaList(schemaidlist);
     850         134 :             PublicationAddSchemas(puboid, schemaidlist, true, NULL);
     851             :         }
     852             :     }
     853             : 
     854         544 :     table_close(rel, RowExclusiveLock);
     855             : 
     856         544 :     InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
     857             : 
     858         544 :     if (wal_level != WAL_LEVEL_LOGICAL)
     859         306 :         ereport(WARNING,
     860             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     861             :                  errmsg("wal_level is insufficient to publish logical changes"),
     862             :                  errhint("Set wal_level to \"logical\" before creating subscriptions.")));
     863             : 
     864         544 :     return myself;
     865             : }
     866             : 
     867             : /*
     868             :  * Change options of a publication.
     869             :  */
     870             : static void
     871         110 : AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
     872             :                         Relation rel, HeapTuple tup)
     873             : {
     874             :     bool        nulls[Natts_pg_publication];
     875             :     bool        replaces[Natts_pg_publication];
     876             :     Datum       values[Natts_pg_publication];
     877             :     bool        publish_given;
     878             :     PublicationActions pubactions;
     879             :     bool        publish_via_partition_root_given;
     880             :     bool        publish_via_partition_root;
     881             :     ObjectAddress obj;
     882             :     Form_pg_publication pubform;
     883         110 :     List       *root_relids = NIL;
     884             :     ListCell   *lc;
     885             : 
     886         110 :     parse_publication_options(pstate,
     887             :                               stmt->options,
     888             :                               &publish_given, &pubactions,
     889             :                               &publish_via_partition_root_given,
     890             :                               &publish_via_partition_root);
     891             : 
     892         110 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
     893             : 
     894             :     /*
     895             :      * If the publication doesn't publish changes via the root partitioned
     896             :      * table, the partition's row filter and column list will be used. So
     897             :      * disallow using WHERE clause and column lists on partitioned table in
     898             :      * this case.
     899             :      */
     900         110 :     if (!pubform->puballtables && publish_via_partition_root_given &&
     901          80 :         !publish_via_partition_root)
     902             :     {
     903             :         /*
     904             :          * Lock the publication so nobody else can do anything with it. This
     905             :          * prevents concurrent alter to add partitioned table(s) with WHERE
     906             :          * clause(s) and/or column lists which we don't allow when not
     907             :          * publishing via root.
     908             :          */
     909          48 :         LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
     910             :                            AccessShareLock);
     911             : 
     912          48 :         root_relids = GetPublicationRelations(pubform->oid,
     913             :                                               PUBLICATION_PART_ROOT);
     914             : 
     915          84 :         foreach(lc, root_relids)
     916             :         {
     917          48 :             Oid         relid = lfirst_oid(lc);
     918             :             HeapTuple   rftuple;
     919             :             char        relkind;
     920             :             char       *relname;
     921             :             bool        has_rowfilter;
     922             :             bool        has_collist;
     923             : 
     924             :             /*
     925             :              * Beware: we don't have lock on the relations, so cope silently
     926             :              * with the cache lookups returning NULL.
     927             :              */
     928             : 
     929          48 :             rftuple = SearchSysCache2(PUBLICATIONRELMAP,
     930             :                                       ObjectIdGetDatum(relid),
     931             :                                       ObjectIdGetDatum(pubform->oid));
     932          48 :             if (!HeapTupleIsValid(rftuple))
     933           0 :                 continue;
     934          48 :             has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
     935          48 :             has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
     936          48 :             if (!has_rowfilter && !has_collist)
     937             :             {
     938          12 :                 ReleaseSysCache(rftuple);
     939          12 :                 continue;
     940             :             }
     941             : 
     942          36 :             relkind = get_rel_relkind(relid);
     943          36 :             if (relkind != RELKIND_PARTITIONED_TABLE)
     944             :             {
     945          24 :                 ReleaseSysCache(rftuple);
     946          24 :                 continue;
     947             :             }
     948          12 :             relname = get_rel_name(relid);
     949          12 :             if (relname == NULL)    /* table concurrently dropped */
     950             :             {
     951           0 :                 ReleaseSysCache(rftuple);
     952           0 :                 continue;
     953             :             }
     954             : 
     955          12 :             if (has_rowfilter)
     956           6 :                 ereport(ERROR,
     957             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     958             :                          errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
     959             :                                 "publish_via_partition_root",
     960             :                                 stmt->pubname),
     961             :                          errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
     962             :                                    relname, "publish_via_partition_root")));
     963             :             Assert(has_collist);
     964           6 :             ereport(ERROR,
     965             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     966             :                      errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
     967             :                             "publish_via_partition_root",
     968             :                             stmt->pubname),
     969             :                      errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
     970             :                                relname, "publish_via_partition_root")));
     971             :         }
     972             :     }
     973             : 
     974             :     /* Everything ok, form a new tuple. */
     975          98 :     memset(values, 0, sizeof(values));
     976          98 :     memset(nulls, false, sizeof(nulls));
     977          98 :     memset(replaces, false, sizeof(replaces));
     978             : 
     979          98 :     if (publish_given)
     980             :     {
     981          28 :         values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
     982          28 :         replaces[Anum_pg_publication_pubinsert - 1] = true;
     983             : 
     984          28 :         values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
     985          28 :         replaces[Anum_pg_publication_pubupdate - 1] = true;
     986             : 
     987          28 :         values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
     988          28 :         replaces[Anum_pg_publication_pubdelete - 1] = true;
     989             : 
     990          28 :         values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
     991          28 :         replaces[Anum_pg_publication_pubtruncate - 1] = true;
     992             :     }
     993             : 
     994          98 :     if (publish_via_partition_root_given)
     995             :     {
     996          70 :         values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
     997          70 :         replaces[Anum_pg_publication_pubviaroot - 1] = true;
     998             :     }
     999             : 
    1000          98 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
    1001             :                             replaces);
    1002             : 
    1003             :     /* Update the catalog. */
    1004          98 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    1005             : 
    1006          98 :     CommandCounterIncrement();
    1007             : 
    1008          98 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1009             : 
    1010             :     /* Invalidate the relcache. */
    1011          98 :     if (pubform->puballtables)
    1012             :     {
    1013           8 :         CacheInvalidateRelcacheAll();
    1014             :     }
    1015             :     else
    1016             :     {
    1017          90 :         List       *relids = NIL;
    1018          90 :         List       *schemarelids = NIL;
    1019             : 
    1020             :         /*
    1021             :          * For any partitioned tables contained in the publication, we must
    1022             :          * invalidate all partitions contained in the respective partition
    1023             :          * trees, not just those explicitly mentioned in the publication.
    1024             :          */
    1025          90 :         if (root_relids == NIL)
    1026          54 :             relids = GetPublicationRelations(pubform->oid,
    1027             :                                              PUBLICATION_PART_ALL);
    1028             :         else
    1029             :         {
    1030             :             /*
    1031             :              * We already got tables explicitly mentioned in the publication.
    1032             :              * Now get all partitions for the partitioned table in the list.
    1033             :              */
    1034          72 :             foreach(lc, root_relids)
    1035          36 :                 relids = GetPubPartitionOptionRelations(relids,
    1036             :                                                         PUBLICATION_PART_ALL,
    1037             :                                                         lfirst_oid(lc));
    1038             :         }
    1039             : 
    1040          90 :         schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
    1041             :                                                         PUBLICATION_PART_ALL);
    1042          90 :         relids = list_concat_unique_oid(relids, schemarelids);
    1043             : 
    1044          90 :         InvalidatePublicationRels(relids);
    1045             :     }
    1046             : 
    1047          98 :     ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
    1048          98 :     EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
    1049             :                                      (Node *) stmt);
    1050             : 
    1051          98 :     InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
    1052          98 : }
    1053             : 
    1054             : /*
    1055             :  * Invalidate the relations.
    1056             :  */
    1057             : void
    1058        2176 : InvalidatePublicationRels(List *relids)
    1059             : {
    1060             :     /*
    1061             :      * We don't want to send too many individual messages, at some point it's
    1062             :      * cheaper to just reset whole relcache.
    1063             :      */
    1064        2176 :     if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
    1065             :     {
    1066             :         ListCell   *lc;
    1067             : 
    1068       20888 :         foreach(lc, relids)
    1069       18712 :             CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
    1070             :     }
    1071             :     else
    1072           0 :         CacheInvalidateRelcacheAll();
    1073        2176 : }
    1074             : 
    1075             : /*
    1076             :  * Add or remove table to/from publication.
    1077             :  */
    1078             : static void
    1079         848 : AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
    1080             :                        List *tables, const char *queryString,
    1081             :                        bool publish_schema)
    1082             : {
    1083         848 :     List       *rels = NIL;
    1084         848 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
    1085         848 :     Oid         pubid = pubform->oid;
    1086             : 
    1087             :     /*
    1088             :      * Nothing to do if no objects, except in SET: for that it is quite
    1089             :      * possible that user has not specified any tables in which case we need
    1090             :      * to remove all the existing tables.
    1091             :      */
    1092         848 :     if (!tables && stmt->action != AP_SetObjects)
    1093          66 :         return;
    1094             : 
    1095         782 :     rels = OpenTableList(tables);
    1096             : 
    1097         782 :     if (stmt->action == AP_AddObjects)
    1098             :     {
    1099         268 :         TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
    1100             : 
    1101         250 :         publish_schema |= is_schema_publication(pubid);
    1102             : 
    1103         250 :         CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
    1104         250 :                                    pubform->pubviaroot);
    1105             : 
    1106         238 :         PublicationAddTables(pubid, rels, false, stmt);
    1107             :     }
    1108         514 :     else if (stmt->action == AP_DropObjects)
    1109          92 :         PublicationDropTables(pubid, rels, false);
    1110             :     else                        /* AP_SetObjects */
    1111             :     {
    1112         422 :         List       *oldrelids = GetPublicationRelations(pubid,
    1113             :                                                         PUBLICATION_PART_ROOT);
    1114         422 :         List       *delrels = NIL;
    1115             :         ListCell   *oldlc;
    1116             : 
    1117         422 :         TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
    1118             : 
    1119         404 :         CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
    1120         404 :                                    pubform->pubviaroot);
    1121             : 
    1122             :         /*
    1123             :          * To recreate the relation list for the publication, look for
    1124             :          * existing relations that do not need to be dropped.
    1125             :          */
    1126         770 :         foreach(oldlc, oldrelids)
    1127             :         {
    1128         378 :             Oid         oldrelid = lfirst_oid(oldlc);
    1129             :             ListCell   *newlc;
    1130             :             PublicationRelInfo *oldrel;
    1131         378 :             bool        found = false;
    1132             :             HeapTuple   rftuple;
    1133         378 :             Node       *oldrelwhereclause = NULL;
    1134         378 :             Bitmapset  *oldcolumns = NULL;
    1135             : 
    1136             :             /* look up the cache for the old relmap */
    1137         378 :             rftuple = SearchSysCache2(PUBLICATIONRELMAP,
    1138             :                                       ObjectIdGetDatum(oldrelid),
    1139             :                                       ObjectIdGetDatum(pubid));
    1140             : 
    1141             :             /*
    1142             :              * See if the existing relation currently has a WHERE clause or a
    1143             :              * column list. We need to compare those too.
    1144             :              */
    1145         378 :             if (HeapTupleIsValid(rftuple))
    1146             :             {
    1147         378 :                 bool        isnull = true;
    1148             :                 Datum       whereClauseDatum;
    1149             :                 Datum       columnListDatum;
    1150             : 
    1151             :                 /* Load the WHERE clause for this table. */
    1152         378 :                 whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
    1153             :                                                    Anum_pg_publication_rel_prqual,
    1154             :                                                    &isnull);
    1155         378 :                 if (!isnull)
    1156         210 :                     oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
    1157             : 
    1158             :                 /* Transform the int2vector column list to a bitmap. */
    1159         378 :                 columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
    1160             :                                                   Anum_pg_publication_rel_prattrs,
    1161             :                                                   &isnull);
    1162             : 
    1163         378 :                 if (!isnull)
    1164         124 :                     oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
    1165             : 
    1166         378 :                 ReleaseSysCache(rftuple);
    1167             :             }
    1168             : 
    1169         742 :             foreach(newlc, rels)
    1170             :             {
    1171             :                 PublicationRelInfo *newpubrel;
    1172             :                 Oid         newrelid;
    1173         380 :                 Bitmapset  *newcolumns = NULL;
    1174             : 
    1175         380 :                 newpubrel = (PublicationRelInfo *) lfirst(newlc);
    1176         380 :                 newrelid = RelationGetRelid(newpubrel->relation);
    1177             : 
    1178             :                 /*
    1179             :                  * If the new publication has column list, transform it to a
    1180             :                  * bitmap too.
    1181             :                  */
    1182         380 :                 if (newpubrel->columns)
    1183             :                 {
    1184             :                     ListCell   *lc;
    1185             : 
    1186         354 :                     foreach(lc, newpubrel->columns)
    1187             :                     {
    1188         218 :                         char       *colname = strVal(lfirst(lc));
    1189         218 :                         AttrNumber  attnum = get_attnum(newrelid, colname);
    1190             : 
    1191         218 :                         newcolumns = bms_add_member(newcolumns, attnum);
    1192             :                     }
    1193             :                 }
    1194             : 
    1195             :                 /*
    1196             :                  * Check if any of the new set of relations matches with the
    1197             :                  * existing relations in the publication. Additionally, if the
    1198             :                  * relation has an associated WHERE clause, check the WHERE
    1199             :                  * expressions also match. Same for the column list. Drop the
    1200             :                  * rest.
    1201             :                  */
    1202         380 :                 if (RelationGetRelid(newpubrel->relation) == oldrelid)
    1203             :                 {
    1204         260 :                     if (equal(oldrelwhereclause, newpubrel->whereClause) &&
    1205          68 :                         bms_equal(oldcolumns, newcolumns))
    1206             :                     {
    1207          16 :                         found = true;
    1208          16 :                         break;
    1209             :                     }
    1210             :                 }
    1211             :             }
    1212             : 
    1213             :             /*
    1214             :              * Add the non-matched relations to a list so that they can be
    1215             :              * dropped.
    1216             :              */
    1217         378 :             if (!found)
    1218             :             {
    1219         362 :                 oldrel = palloc(sizeof(PublicationRelInfo));
    1220         362 :                 oldrel->whereClause = NULL;
    1221         362 :                 oldrel->columns = NIL;
    1222         362 :                 oldrel->relation = table_open(oldrelid,
    1223             :                                               ShareUpdateExclusiveLock);
    1224         362 :                 delrels = lappend(delrels, oldrel);
    1225             :             }
    1226             :         }
    1227             : 
    1228             :         /* And drop them. */
    1229         392 :         PublicationDropTables(pubid, delrels, true);
    1230             : 
    1231             :         /*
    1232             :          * Don't bother calculating the difference for adding, we'll catch and
    1233             :          * skip existing ones when doing catalog update.
    1234             :          */
    1235         392 :         PublicationAddTables(pubid, rels, true, stmt);
    1236             : 
    1237         392 :         CloseTableList(delrels);
    1238             :     }
    1239             : 
    1240         668 :     CloseTableList(rels);
    1241             : }
    1242             : 
    1243             : /*
    1244             :  * Alter the publication schemas.
    1245             :  *
    1246             :  * Add or remove schemas to/from publication.
    1247             :  */
    1248             : static void
    1249         734 : AlterPublicationSchemas(AlterPublicationStmt *stmt,
    1250             :                         HeapTuple tup, List *schemaidlist)
    1251             : {
    1252         734 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
    1253             : 
    1254             :     /*
    1255             :      * Nothing to do if no objects, except in SET: for that it is quite
    1256             :      * possible that user has not specified any schemas in which case we need
    1257             :      * to remove all the existing schemas.
    1258             :      */
    1259         734 :     if (!schemaidlist && stmt->action != AP_SetObjects)
    1260         276 :         return;
    1261             : 
    1262             :     /*
    1263             :      * Schema lock is held until the publication is altered to prevent
    1264             :      * concurrent schema deletion.
    1265             :      */
    1266         458 :     LockSchemaList(schemaidlist);
    1267         458 :     if (stmt->action == AP_AddObjects)
    1268             :     {
    1269             :         ListCell   *lc;
    1270             :         List       *reloids;
    1271             : 
    1272          28 :         reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
    1273             : 
    1274          36 :         foreach(lc, reloids)
    1275             :         {
    1276             :             HeapTuple   coltuple;
    1277             : 
    1278          14 :             coltuple = SearchSysCache2(PUBLICATIONRELMAP,
    1279             :                                        ObjectIdGetDatum(lfirst_oid(lc)),
    1280             :                                        ObjectIdGetDatum(pubform->oid));
    1281             : 
    1282          14 :             if (!HeapTupleIsValid(coltuple))
    1283           0 :                 continue;
    1284             : 
    1285             :             /*
    1286             :              * Disallow adding schema if column list is already part of the
    1287             :              * publication. See CheckPubRelationColumnList.
    1288             :              */
    1289          14 :             if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
    1290           6 :                 ereport(ERROR,
    1291             :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1292             :                         errmsg("cannot add schema to publication \"%s\"",
    1293             :                                stmt->pubname),
    1294             :                         errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
    1295             : 
    1296           8 :             ReleaseSysCache(coltuple);
    1297             :         }
    1298             : 
    1299          22 :         PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
    1300             :     }
    1301         430 :     else if (stmt->action == AP_DropObjects)
    1302          38 :         PublicationDropSchemas(pubform->oid, schemaidlist, false);
    1303             :     else                        /* AP_SetObjects */
    1304             :     {
    1305         392 :         List       *oldschemaids = GetPublicationSchemas(pubform->oid);
    1306         392 :         List       *delschemas = NIL;
    1307             : 
    1308             :         /* Identify which schemas should be dropped */
    1309         392 :         delschemas = list_difference_oid(oldschemaids, schemaidlist);
    1310             : 
    1311             :         /*
    1312             :          * Schema lock is held until the publication is altered to prevent
    1313             :          * concurrent schema deletion.
    1314             :          */
    1315         392 :         LockSchemaList(delschemas);
    1316             : 
    1317             :         /* And drop them */
    1318         392 :         PublicationDropSchemas(pubform->oid, delschemas, true);
    1319             : 
    1320             :         /*
    1321             :          * Don't bother calculating the difference for adding, we'll catch and
    1322             :          * skip existing ones when doing catalog update.
    1323             :          */
    1324         392 :         PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
    1325             :     }
    1326             : }
    1327             : 
    1328             : /*
    1329             :  * Check if relations and schemas can be in a given publication and throw
    1330             :  * appropriate error if not.
    1331             :  */
    1332             : static void
    1333         890 : CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
    1334             :                       List *tables, List *schemaidlist)
    1335             : {
    1336         890 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
    1337             : 
    1338         890 :     if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
    1339          94 :         schemaidlist && !superuser())
    1340           6 :         ereport(ERROR,
    1341             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1342             :                  errmsg("must be superuser to add or set schemas")));
    1343             : 
    1344             :     /*
    1345             :      * Check that user is allowed to manipulate the publication tables in
    1346             :      * schema
    1347             :      */
    1348         884 :     if (schemaidlist && pubform->puballtables)
    1349          18 :         ereport(ERROR,
    1350             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1351             :                  errmsg("publication \"%s\" is defined as FOR ALL TABLES",
    1352             :                         NameStr(pubform->pubname)),
    1353             :                  errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications.")));
    1354             : 
    1355             :     /* Check that user is allowed to manipulate the publication tables. */
    1356         866 :     if (tables && pubform->puballtables)
    1357          18 :         ereport(ERROR,
    1358             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1359             :                  errmsg("publication \"%s\" is defined as FOR ALL TABLES",
    1360             :                         NameStr(pubform->pubname)),
    1361             :                  errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
    1362         848 : }
    1363             : 
    1364             : /*
    1365             :  * Alter the existing publication.
    1366             :  *
    1367             :  * This is dispatcher function for AlterPublicationOptions,
    1368             :  * AlterPublicationSchemas and AlterPublicationTables.
    1369             :  */
    1370             : void
    1371        1018 : AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
    1372             : {
    1373             :     Relation    rel;
    1374             :     HeapTuple   tup;
    1375             :     Form_pg_publication pubform;
    1376             : 
    1377        1018 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    1378             : 
    1379        1018 :     tup = SearchSysCacheCopy1(PUBLICATIONNAME,
    1380             :                               CStringGetDatum(stmt->pubname));
    1381             : 
    1382        1018 :     if (!HeapTupleIsValid(tup))
    1383           0 :         ereport(ERROR,
    1384             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    1385             :                  errmsg("publication \"%s\" does not exist",
    1386             :                         stmt->pubname)));
    1387             : 
    1388        1018 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1389             : 
    1390             :     /* must be owner */
    1391        1018 :     if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
    1392           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
    1393           0 :                        stmt->pubname);
    1394             : 
    1395        1018 :     if (stmt->options)
    1396         110 :         AlterPublicationOptions(pstate, stmt, rel, tup);
    1397             :     else
    1398             :     {
    1399         908 :         List       *relations = NIL;
    1400         908 :         List       *schemaidlist = NIL;
    1401         908 :         Oid         pubid = pubform->oid;
    1402             : 
    1403         908 :         ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
    1404             :                                    &schemaidlist);
    1405             : 
    1406         890 :         CheckAlterPublication(stmt, tup, relations, schemaidlist);
    1407             : 
    1408         848 :         heap_freetuple(tup);
    1409             : 
    1410             :         /* Lock the publication so nobody else can do anything with it. */
    1411         848 :         LockDatabaseObject(PublicationRelationId, pubid, 0,
    1412             :                            AccessExclusiveLock);
    1413             : 
    1414             :         /*
    1415             :          * It is possible that by the time we acquire the lock on publication,
    1416             :          * concurrent DDL has removed it. We can test this by checking the
    1417             :          * existence of publication. We get the tuple again to avoid the risk
    1418             :          * of any publication option getting changed.
    1419             :          */
    1420         848 :         tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    1421         848 :         if (!HeapTupleIsValid(tup))
    1422           0 :             ereport(ERROR,
    1423             :                     errcode(ERRCODE_UNDEFINED_OBJECT),
    1424             :                     errmsg("publication \"%s\" does not exist",
    1425             :                            stmt->pubname));
    1426             : 
    1427         848 :         AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
    1428             :                                schemaidlist != NIL);
    1429         734 :         AlterPublicationSchemas(stmt, tup, schemaidlist);
    1430             :     }
    1431             : 
    1432             :     /* Cleanup. */
    1433         814 :     heap_freetuple(tup);
    1434         814 :     table_close(rel, RowExclusiveLock);
    1435         814 : }
    1436             : 
    1437             : /*
    1438             :  * Remove relation from publication by mapping OID.
    1439             :  */
    1440             : void
    1441         760 : RemovePublicationRelById(Oid proid)
    1442             : {
    1443             :     Relation    rel;
    1444             :     HeapTuple   tup;
    1445             :     Form_pg_publication_rel pubrel;
    1446         760 :     List       *relids = NIL;
    1447             : 
    1448         760 :     rel = table_open(PublicationRelRelationId, RowExclusiveLock);
    1449             : 
    1450         760 :     tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
    1451             : 
    1452         760 :     if (!HeapTupleIsValid(tup))
    1453           0 :         elog(ERROR, "cache lookup failed for publication table %u",
    1454             :              proid);
    1455             : 
    1456         760 :     pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
    1457             : 
    1458             :     /*
    1459             :      * Invalidate relcache so that publication info is rebuilt.
    1460             :      *
    1461             :      * For the partitioned tables, we must invalidate all partitions contained
    1462             :      * in the respective partition hierarchies, not just the one explicitly
    1463             :      * mentioned in the publication. This is required because we implicitly
    1464             :      * publish the child tables when the parent table is published.
    1465             :      */
    1466         760 :     relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
    1467             :                                             pubrel->prrelid);
    1468             : 
    1469         760 :     InvalidatePublicationRels(relids);
    1470             : 
    1471         760 :     CatalogTupleDelete(rel, &tup->t_self);
    1472             : 
    1473         760 :     ReleaseSysCache(tup);
    1474             : 
    1475         760 :     table_close(rel, RowExclusiveLock);
    1476         760 : }
    1477             : 
    1478             : /*
    1479             :  * Remove the publication by mapping OID.
    1480             :  */
    1481             : void
    1482         370 : RemovePublicationById(Oid pubid)
    1483             : {
    1484             :     Relation    rel;
    1485             :     HeapTuple   tup;
    1486             :     Form_pg_publication pubform;
    1487             : 
    1488         370 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    1489             : 
    1490         370 :     tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    1491         370 :     if (!HeapTupleIsValid(tup))
    1492           0 :         elog(ERROR, "cache lookup failed for publication %u", pubid);
    1493             : 
    1494         370 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1495             : 
    1496             :     /* Invalidate relcache so that publication info is rebuilt. */
    1497         370 :     if (pubform->puballtables)
    1498          24 :         CacheInvalidateRelcacheAll();
    1499             : 
    1500         370 :     CatalogTupleDelete(rel, &tup->t_self);
    1501             : 
    1502         370 :     ReleaseSysCache(tup);
    1503             : 
    1504         370 :     table_close(rel, RowExclusiveLock);
    1505         370 : }
    1506             : 
    1507             : /*
    1508             :  * Remove schema from publication by mapping OID.
    1509             :  */
    1510             : void
    1511         192 : RemovePublicationSchemaById(Oid psoid)
    1512             : {
    1513             :     Relation    rel;
    1514             :     HeapTuple   tup;
    1515         192 :     List       *schemaRels = NIL;
    1516             :     Form_pg_publication_namespace pubsch;
    1517             : 
    1518         192 :     rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
    1519             : 
    1520         192 :     tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
    1521             : 
    1522         192 :     if (!HeapTupleIsValid(tup))
    1523           0 :         elog(ERROR, "cache lookup failed for publication schema %u", psoid);
    1524             : 
    1525         192 :     pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
    1526             : 
    1527             :     /*
    1528             :      * Invalidate relcache so that publication info is rebuilt. See
    1529             :      * RemovePublicationRelById for why we need to consider all the
    1530             :      * partitions.
    1531             :      */
    1532         192 :     schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
    1533             :                                                PUBLICATION_PART_ALL);
    1534         192 :     InvalidatePublicationRels(schemaRels);
    1535             : 
    1536         192 :     CatalogTupleDelete(rel, &tup->t_self);
    1537             : 
    1538         192 :     ReleaseSysCache(tup);
    1539             : 
    1540         192 :     table_close(rel, RowExclusiveLock);
    1541         192 : }
    1542             : 
    1543             : /*
    1544             :  * Open relations specified by a PublicationTable list.
    1545             :  * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
    1546             :  * add them to a publication.
    1547             :  */
    1548             : static List *
    1549        1158 : OpenTableList(List *tables)
    1550             : {
    1551        1158 :     List       *relids = NIL;
    1552        1158 :     List       *rels = NIL;
    1553             :     ListCell   *lc;
    1554        1158 :     List       *relids_with_rf = NIL;
    1555        1158 :     List       *relids_with_collist = NIL;
    1556             : 
    1557             :     /*
    1558             :      * Open, share-lock, and check all the explicitly-specified relations
    1559             :      */
    1560        2376 :     foreach(lc, tables)
    1561             :     {
    1562        1242 :         PublicationTable *t = lfirst_node(PublicationTable, lc);
    1563        1242 :         bool        recurse = t->relation->inh;
    1564             :         Relation    rel;
    1565             :         Oid         myrelid;
    1566             :         PublicationRelInfo *pub_rel;
    1567             : 
    1568             :         /* Allow query cancel in case this takes a long time */
    1569        1242 :         CHECK_FOR_INTERRUPTS();
    1570             : 
    1571        1242 :         rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
    1572        1242 :         myrelid = RelationGetRelid(rel);
    1573             : 
    1574             :         /*
    1575             :          * Filter out duplicates if user specifies "foo, foo".
    1576             :          *
    1577             :          * Note that this algorithm is known to not be very efficient (O(N^2))
    1578             :          * but given that it only works on list of tables given to us by user
    1579             :          * it's deemed acceptable.
    1580             :          */
    1581        1242 :         if (list_member_oid(relids, myrelid))
    1582             :         {
    1583             :             /* Disallow duplicate tables if there are any with row filters. */
    1584          24 :             if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
    1585          12 :                 ereport(ERROR,
    1586             :                         (errcode(ERRCODE_DUPLICATE_OBJECT),
    1587             :                          errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
    1588             :                                 RelationGetRelationName(rel))));
    1589             : 
    1590             :             /* Disallow duplicate tables if there are any with column lists. */
    1591          12 :             if (t->columns || list_member_oid(relids_with_collist, myrelid))
    1592          12 :                 ereport(ERROR,
    1593             :                         (errcode(ERRCODE_DUPLICATE_OBJECT),
    1594             :                          errmsg("conflicting or redundant column lists for table \"%s\"",
    1595             :                                 RelationGetRelationName(rel))));
    1596             : 
    1597           0 :             table_close(rel, ShareUpdateExclusiveLock);
    1598           0 :             continue;
    1599             :         }
    1600             : 
    1601        1218 :         pub_rel = palloc(sizeof(PublicationRelInfo));
    1602        1218 :         pub_rel->relation = rel;
    1603        1218 :         pub_rel->whereClause = t->whereClause;
    1604        1218 :         pub_rel->columns = t->columns;
    1605        1218 :         rels = lappend(rels, pub_rel);
    1606        1218 :         relids = lappend_oid(relids, myrelid);
    1607             : 
    1608        1218 :         if (t->whereClause)
    1609         372 :             relids_with_rf = lappend_oid(relids_with_rf, myrelid);
    1610             : 
    1611        1218 :         if (t->columns)
    1612         332 :             relids_with_collist = lappend_oid(relids_with_collist, myrelid);
    1613             : 
    1614             :         /*
    1615             :          * Add children of this rel, if requested, so that they too are added
    1616             :          * to the publication.  A partitioned table can't have any inheritance
    1617             :          * children other than its partitions, which need not be explicitly
    1618             :          * added to the publication.
    1619             :          */
    1620        1218 :         if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
    1621             :         {
    1622             :             List       *children;
    1623             :             ListCell   *child;
    1624             : 
    1625        1036 :             children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
    1626             :                                            NULL);
    1627             : 
    1628        2080 :             foreach(child, children)
    1629             :             {
    1630        1044 :                 Oid         childrelid = lfirst_oid(child);
    1631             : 
    1632             :                 /* Allow query cancel in case this takes a long time */
    1633        1044 :                 CHECK_FOR_INTERRUPTS();
    1634             : 
    1635             :                 /*
    1636             :                  * Skip duplicates if user specified both parent and child
    1637             :                  * tables.
    1638             :                  */
    1639        1044 :                 if (list_member_oid(relids, childrelid))
    1640             :                 {
    1641             :                     /*
    1642             :                      * We don't allow to specify row filter for both parent
    1643             :                      * and child table at the same time as it is not very
    1644             :                      * clear which one should be given preference.
    1645             :                      */
    1646        1036 :                     if (childrelid != myrelid &&
    1647           0 :                         (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
    1648           0 :                         ereport(ERROR,
    1649             :                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    1650             :                                  errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
    1651             :                                         RelationGetRelationName(rel))));
    1652             : 
    1653             :                     /*
    1654             :                      * We don't allow to specify column list for both parent
    1655             :                      * and child table at the same time as it is not very
    1656             :                      * clear which one should be given preference.
    1657             :                      */
    1658        1036 :                     if (childrelid != myrelid &&
    1659           0 :                         (t->columns || list_member_oid(relids_with_collist, childrelid)))
    1660           0 :                         ereport(ERROR,
    1661             :                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    1662             :                                  errmsg("conflicting or redundant column lists for table \"%s\"",
    1663             :                                         RelationGetRelationName(rel))));
    1664             : 
    1665        1036 :                     continue;
    1666             :                 }
    1667             : 
    1668             :                 /* find_all_inheritors already got lock */
    1669           8 :                 rel = table_open(childrelid, NoLock);
    1670           8 :                 pub_rel = palloc(sizeof(PublicationRelInfo));
    1671           8 :                 pub_rel->relation = rel;
    1672             :                 /* child inherits WHERE clause from parent */
    1673           8 :                 pub_rel->whereClause = t->whereClause;
    1674             : 
    1675             :                 /* child inherits column list from parent */
    1676           8 :                 pub_rel->columns = t->columns;
    1677           8 :                 rels = lappend(rels, pub_rel);
    1678           8 :                 relids = lappend_oid(relids, childrelid);
    1679             : 
    1680           8 :                 if (t->whereClause)
    1681           2 :                     relids_with_rf = lappend_oid(relids_with_rf, childrelid);
    1682             : 
    1683           8 :                 if (t->columns)
    1684           0 :                     relids_with_collist = lappend_oid(relids_with_collist, childrelid);
    1685             :             }
    1686             :         }
    1687             :     }
    1688             : 
    1689        1134 :     list_free(relids);
    1690        1134 :     list_free(relids_with_rf);
    1691             : 
    1692        1134 :     return rels;
    1693             : }
    1694             : 
    1695             : /*
    1696             :  * Close all relations in the list.
    1697             :  */
    1698             : static void
    1699        1356 : CloseTableList(List *rels)
    1700             : {
    1701             :     ListCell   *lc;
    1702             : 
    1703        2750 :     foreach(lc, rels)
    1704             :     {
    1705             :         PublicationRelInfo *pub_rel;
    1706             : 
    1707        1394 :         pub_rel = (PublicationRelInfo *) lfirst(lc);
    1708        1394 :         table_close(pub_rel->relation, NoLock);
    1709             :     }
    1710             : 
    1711        1356 :     list_free_deep(rels);
    1712        1356 : }
    1713             : 
    1714             : /*
    1715             :  * Lock the schemas specified in the schema list in AccessShareLock mode in
    1716             :  * order to prevent concurrent schema deletion.
    1717             :  */
    1718             : static void
    1719         984 : LockSchemaList(List *schemalist)
    1720             : {
    1721             :     ListCell   *lc;
    1722             : 
    1723        1268 :     foreach(lc, schemalist)
    1724             :     {
    1725         284 :         Oid         schemaid = lfirst_oid(lc);
    1726             : 
    1727             :         /* Allow query cancel in case this takes a long time */
    1728         284 :         CHECK_FOR_INTERRUPTS();
    1729         284 :         LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
    1730             : 
    1731             :         /*
    1732             :          * It is possible that by the time we acquire the lock on schema,
    1733             :          * concurrent DDL has removed it. We can test this by checking the
    1734             :          * existence of schema.
    1735             :          */
    1736         284 :         if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
    1737           0 :             ereport(ERROR,
    1738             :                     errcode(ERRCODE_UNDEFINED_SCHEMA),
    1739             :                     errmsg("schema with OID %u does not exist", schemaid));
    1740             :     }
    1741         984 : }
    1742             : 
    1743             : /*
    1744             :  * Add listed tables to the publication.
    1745             :  */
    1746             : static void
    1747         952 : PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
    1748             :                      AlterPublicationStmt *stmt)
    1749             : {
    1750             :     ListCell   *lc;
    1751             : 
    1752             :     Assert(!stmt || !stmt->for_all_tables);
    1753             : 
    1754        1904 :     foreach(lc, rels)
    1755             :     {
    1756        1014 :         PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
    1757        1014 :         Relation    rel = pub_rel->relation;
    1758             :         ObjectAddress obj;
    1759             : 
    1760             :         /* Must be owner of the table or superuser. */
    1761        1014 :         if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
    1762           6 :             aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
    1763           6 :                            RelationGetRelationName(rel));
    1764             : 
    1765        1008 :         obj = publication_add_relation(pubid, pub_rel, if_not_exists);
    1766         952 :         if (stmt)
    1767             :         {
    1768         594 :             EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
    1769             :                                              (Node *) stmt);
    1770             : 
    1771         594 :             InvokeObjectPostCreateHook(PublicationRelRelationId,
    1772             :                                        obj.objectId, 0);
    1773             :         }
    1774             :     }
    1775         890 : }
    1776             : 
    1777             : /*
    1778             :  * Remove listed tables from the publication.
    1779             :  */
    1780             : static void
    1781         484 : PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
    1782             : {
    1783             :     ObjectAddress obj;
    1784             :     ListCell   *lc;
    1785             :     Oid         prid;
    1786             : 
    1787         926 :     foreach(lc, rels)
    1788             :     {
    1789         460 :         PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
    1790         460 :         Relation    rel = pubrel->relation;
    1791         460 :         Oid         relid = RelationGetRelid(rel);
    1792             : 
    1793         460 :         if (pubrel->columns)
    1794           0 :             ereport(ERROR,
    1795             :                     errcode(ERRCODE_SYNTAX_ERROR),
    1796             :                     errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
    1797             : 
    1798         460 :         prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
    1799             :                                ObjectIdGetDatum(relid),
    1800             :                                ObjectIdGetDatum(pubid));
    1801         460 :         if (!OidIsValid(prid))
    1802             :         {
    1803          12 :             if (missing_ok)
    1804           0 :                 continue;
    1805             : 
    1806          12 :             ereport(ERROR,
    1807             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    1808             :                      errmsg("relation \"%s\" is not part of the publication",
    1809             :                             RelationGetRelationName(rel))));
    1810             :         }
    1811             : 
    1812         448 :         if (pubrel->whereClause)
    1813           6 :             ereport(ERROR,
    1814             :                     (errcode(ERRCODE_SYNTAX_ERROR),
    1815             :                      errmsg("cannot use a WHERE clause when removing a table from a publication")));
    1816             : 
    1817         442 :         ObjectAddressSet(obj, PublicationRelRelationId, prid);
    1818         442 :         performDeletion(&obj, DROP_CASCADE, 0);
    1819             :     }
    1820         466 : }
    1821             : 
    1822             : /*
    1823             :  * Add listed schemas to the publication.
    1824             :  */
    1825             : static void
    1826         548 : PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
    1827             :                       AlterPublicationStmt *stmt)
    1828             : {
    1829             :     ListCell   *lc;
    1830             : 
    1831             :     Assert(!stmt || !stmt->for_all_tables);
    1832             : 
    1833         758 :     foreach(lc, schemas)
    1834             :     {
    1835         222 :         Oid         schemaid = lfirst_oid(lc);
    1836             :         ObjectAddress obj;
    1837             : 
    1838         222 :         obj = publication_add_schema(pubid, schemaid, if_not_exists);
    1839         210 :         if (stmt)
    1840             :         {
    1841          58 :             EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
    1842             :                                              (Node *) stmt);
    1843             : 
    1844          58 :             InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
    1845             :                                        obj.objectId, 0);
    1846             :         }
    1847             :     }
    1848         536 : }
    1849             : 
    1850             : /*
    1851             :  * Remove listed schemas from the publication.
    1852             :  */
    1853             : static void
    1854         430 : PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
    1855             : {
    1856             :     ObjectAddress obj;
    1857             :     ListCell   *lc;
    1858             :     Oid         psid;
    1859             : 
    1860         480 :     foreach(lc, schemas)
    1861             :     {
    1862          56 :         Oid         schemaid = lfirst_oid(lc);
    1863             : 
    1864          56 :         psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
    1865             :                                Anum_pg_publication_namespace_oid,
    1866             :                                ObjectIdGetDatum(schemaid),
    1867             :                                ObjectIdGetDatum(pubid));
    1868          56 :         if (!OidIsValid(psid))
    1869             :         {
    1870           6 :             if (missing_ok)
    1871           0 :                 continue;
    1872             : 
    1873           6 :             ereport(ERROR,
    1874             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    1875             :                      errmsg("tables from schema \"%s\" are not part of the publication",
    1876             :                             get_namespace_name(schemaid))));
    1877             :         }
    1878             : 
    1879          50 :         ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
    1880          50 :         performDeletion(&obj, DROP_CASCADE, 0);
    1881             :     }
    1882         424 : }
    1883             : 
    1884             : /*
    1885             :  * Internal workhorse for changing a publication owner
    1886             :  */
    1887             : static void
    1888          26 : AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
    1889             : {
    1890             :     Form_pg_publication form;
    1891             : 
    1892          26 :     form = (Form_pg_publication) GETSTRUCT(tup);
    1893             : 
    1894          26 :     if (form->pubowner == newOwnerId)
    1895           2 :         return;
    1896             : 
    1897          24 :     if (!superuser())
    1898             :     {
    1899             :         AclResult   aclresult;
    1900             : 
    1901             :         /* Must be owner */
    1902          12 :         if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
    1903           0 :             aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
    1904           0 :                            NameStr(form->pubname));
    1905             : 
    1906             :         /* Must be able to become new owner */
    1907          12 :         check_can_set_role(GetUserId(), newOwnerId);
    1908             : 
    1909             :         /* New owner must have CREATE privilege on database */
    1910          12 :         aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
    1911          12 :         if (aclresult != ACLCHECK_OK)
    1912           0 :             aclcheck_error(aclresult, OBJECT_DATABASE,
    1913           0 :                            get_database_name(MyDatabaseId));
    1914             : 
    1915          12 :         if (form->puballtables && !superuser_arg(newOwnerId))
    1916           0 :             ereport(ERROR,
    1917             :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1918             :                      errmsg("permission denied to change owner of publication \"%s\"",
    1919             :                             NameStr(form->pubname)),
    1920             :                      errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
    1921             : 
    1922          12 :         if (!superuser_arg(newOwnerId) && is_schema_publication(form->oid))
    1923           6 :             ereport(ERROR,
    1924             :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1925             :                      errmsg("permission denied to change owner of publication \"%s\"",
    1926             :                             NameStr(form->pubname)),
    1927             :                      errhint("The owner of a FOR TABLES IN SCHEMA publication must be a superuser.")));
    1928             :     }
    1929             : 
    1930          18 :     form->pubowner = newOwnerId;
    1931          18 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    1932             : 
    1933             :     /* Update owner dependency reference */
    1934          18 :     changeDependencyOnOwner(PublicationRelationId,
    1935             :                             form->oid,
    1936             :                             newOwnerId);
    1937             : 
    1938          18 :     InvokeObjectPostAlterHook(PublicationRelationId,
    1939             :                               form->oid, 0);
    1940             : }
    1941             : 
    1942             : /*
    1943             :  * Change publication owner -- by name
    1944             :  */
    1945             : ObjectAddress
    1946          26 : AlterPublicationOwner(const char *name, Oid newOwnerId)
    1947             : {
    1948             :     Oid         subid;
    1949             :     HeapTuple   tup;
    1950             :     Relation    rel;
    1951             :     ObjectAddress address;
    1952             :     Form_pg_publication pubform;
    1953             : 
    1954          26 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    1955             : 
    1956          26 :     tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
    1957             : 
    1958          26 :     if (!HeapTupleIsValid(tup))
    1959           0 :         ereport(ERROR,
    1960             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    1961             :                  errmsg("publication \"%s\" does not exist", name)));
    1962             : 
    1963          26 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1964          26 :     subid = pubform->oid;
    1965             : 
    1966          26 :     AlterPublicationOwner_internal(rel, tup, newOwnerId);
    1967             : 
    1968          20 :     ObjectAddressSet(address, PublicationRelationId, subid);
    1969             : 
    1970          20 :     heap_freetuple(tup);
    1971             : 
    1972          20 :     table_close(rel, RowExclusiveLock);
    1973             : 
    1974          20 :     return address;
    1975             : }
    1976             : 
    1977             : /*
    1978             :  * Change publication owner -- by OID
    1979             :  */
    1980             : void
    1981           0 : AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
    1982             : {
    1983             :     HeapTuple   tup;
    1984             :     Relation    rel;
    1985             : 
    1986           0 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    1987             : 
    1988           0 :     tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(subid));
    1989             : 
    1990           0 :     if (!HeapTupleIsValid(tup))
    1991           0 :         ereport(ERROR,
    1992             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    1993             :                  errmsg("publication with OID %u does not exist", subid)));
    1994             : 
    1995           0 :     AlterPublicationOwner_internal(rel, tup, newOwnerId);
    1996             : 
    1997           0 :     heap_freetuple(tup);
    1998             : 
    1999           0 :     table_close(rel, RowExclusiveLock);
    2000           0 : }

Generated by: LCOV version 1.14