LCOV - code coverage report
Current view: top level - src/backend/commands - publicationcmds.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 616 665 92.6 %
Date: 2025-01-18 04:15:08 Functions: 29 30 96.7 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * publicationcmds.c
       4             :  *      publication manipulation
       5             :  *
       6             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *      src/backend/commands/publicationcmds.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/htup_details.h"
      18             : #include "access/table.h"
      19             : #include "access/xact.h"
      20             : #include "catalog/catalog.h"
      21             : #include "catalog/indexing.h"
      22             : #include "catalog/namespace.h"
      23             : #include "catalog/objectaccess.h"
      24             : #include "catalog/objectaddress.h"
      25             : #include "catalog/pg_database.h"
      26             : #include "catalog/pg_inherits.h"
      27             : #include "catalog/pg_namespace.h"
      28             : #include "catalog/pg_proc.h"
      29             : #include "catalog/pg_publication.h"
      30             : #include "catalog/pg_publication_namespace.h"
      31             : #include "catalog/pg_publication_rel.h"
      32             : #include "commands/dbcommands.h"
      33             : #include "commands/defrem.h"
      34             : #include "commands/event_trigger.h"
      35             : #include "commands/publicationcmds.h"
      36             : #include "miscadmin.h"
      37             : #include "nodes/nodeFuncs.h"
      38             : #include "parser/parse_clause.h"
      39             : #include "parser/parse_collate.h"
      40             : #include "parser/parse_relation.h"
      41             : #include "storage/lmgr.h"
      42             : #include "utils/acl.h"
      43             : #include "utils/builtins.h"
      44             : #include "utils/inval.h"
      45             : #include "utils/lsyscache.h"
      46             : #include "utils/rel.h"
      47             : #include "utils/syscache.h"
      48             : #include "utils/varlena.h"
      49             : 
      50             : 
      51             : /*
      52             :  * Information used to validate the columns in the row filter expression. See
      53             :  * contain_invalid_rfcolumn_walker for details.
      54             :  */
      55             : typedef struct rf_context
      56             : {
      57             :     Bitmapset  *bms_replident;  /* bitset of replica identity columns */
      58             :     bool        pubviaroot;     /* true if we are validating the parent
      59             :                                  * relation's row filter */
      60             :     Oid         relid;          /* relid of the relation */
      61             :     Oid         parentid;       /* relid of the parent relation */
      62             : } rf_context;
      63             : 
      64             : static List *OpenTableList(List *tables);
      65             : static void CloseTableList(List *rels);
      66             : static void LockSchemaList(List *schemalist);
      67             : static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
      68             :                                  AlterPublicationStmt *stmt);
      69             : static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
      70             : static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
      71             :                                   AlterPublicationStmt *stmt);
      72             : static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
      73             : 
      74             : 
      75             : static void
      76         856 : parse_publication_options(ParseState *pstate,
      77             :                           List *options,
      78             :                           bool *publish_given,
      79             :                           PublicationActions *pubactions,
      80             :                           bool *publish_via_partition_root_given,
      81             :                           bool *publish_via_partition_root,
      82             :                           bool *publish_generated_columns_given,
      83             :                           bool *publish_generated_columns)
      84             : {
      85             :     ListCell   *lc;
      86             : 
      87         856 :     *publish_given = false;
      88         856 :     *publish_via_partition_root_given = false;
      89         856 :     *publish_generated_columns_given = false;
      90             : 
      91             :     /* defaults */
      92         856 :     pubactions->pubinsert = true;
      93         856 :     pubactions->pubupdate = true;
      94         856 :     pubactions->pubdelete = true;
      95         856 :     pubactions->pubtruncate = true;
      96         856 :     *publish_via_partition_root = false;
      97         856 :     *publish_generated_columns = false;
      98             : 
      99             :     /* Parse options */
     100        1180 :     foreach(lc, options)
     101             :     {
     102         354 :         DefElem    *defel = (DefElem *) lfirst(lc);
     103             : 
     104         354 :         if (strcmp(defel->defname, "publish") == 0)
     105             :         {
     106             :             char       *publish;
     107             :             List       *publish_list;
     108             :             ListCell   *lc2;
     109             : 
     110         122 :             if (*publish_given)
     111           0 :                 errorConflictingDefElem(defel, pstate);
     112             : 
     113             :             /*
     114             :              * If publish option was given only the explicitly listed actions
     115             :              * should be published.
     116             :              */
     117         122 :             pubactions->pubinsert = false;
     118         122 :             pubactions->pubupdate = false;
     119         122 :             pubactions->pubdelete = false;
     120         122 :             pubactions->pubtruncate = false;
     121             : 
     122         122 :             *publish_given = true;
     123         122 :             publish = defGetString(defel);
     124             : 
     125         122 :             if (!SplitIdentifierString(publish, ',', &publish_list))
     126           0 :                 ereport(ERROR,
     127             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     128             :                          errmsg("invalid list syntax in parameter \"%s\"",
     129             :                                 "publish")));
     130             : 
     131             :             /* Process the option list. */
     132         270 :             foreach(lc2, publish_list)
     133             :             {
     134         154 :                 char       *publish_opt = (char *) lfirst(lc2);
     135             : 
     136         154 :                 if (strcmp(publish_opt, "insert") == 0)
     137         108 :                     pubactions->pubinsert = true;
     138          46 :                 else if (strcmp(publish_opt, "update") == 0)
     139          20 :                     pubactions->pubupdate = true;
     140          26 :                 else if (strcmp(publish_opt, "delete") == 0)
     141          10 :                     pubactions->pubdelete = true;
     142          16 :                 else if (strcmp(publish_opt, "truncate") == 0)
     143          10 :                     pubactions->pubtruncate = true;
     144             :                 else
     145           6 :                     ereport(ERROR,
     146             :                             (errcode(ERRCODE_SYNTAX_ERROR),
     147             :                              errmsg("unrecognized value for publication option \"%s\": \"%s\"",
     148             :                                     "publish", publish_opt)));
     149             :             }
     150             :         }
     151         232 :         else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
     152             :         {
     153         162 :             if (*publish_via_partition_root_given)
     154           6 :                 errorConflictingDefElem(defel, pstate);
     155         156 :             *publish_via_partition_root_given = true;
     156         156 :             *publish_via_partition_root = defGetBoolean(defel);
     157             :         }
     158          70 :         else if (strcmp(defel->defname, "publish_generated_columns") == 0)
     159             :         {
     160          64 :             if (*publish_generated_columns_given)
     161           6 :                 errorConflictingDefElem(defel, pstate);
     162          58 :             *publish_generated_columns_given = true;
     163          58 :             *publish_generated_columns = defGetBoolean(defel);
     164             :         }
     165             :         else
     166           6 :             ereport(ERROR,
     167             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     168             :                      errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
     169             :     }
     170         826 : }
     171             : 
     172             : /*
     173             :  * Convert the PublicationObjSpecType list into schema oid list and
     174             :  * PublicationTable list.
     175             :  */
     176             : static void
     177        1572 : ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
     178             :                            List **rels, List **schemas)
     179             : {
     180             :     ListCell   *cell;
     181             :     PublicationObjSpec *pubobj;
     182             : 
     183        1572 :     if (!pubobjspec_list)
     184          86 :         return;
     185             : 
     186        3134 :     foreach(cell, pubobjspec_list)
     187             :     {
     188             :         Oid         schemaid;
     189             :         List       *search_path;
     190             : 
     191        1684 :         pubobj = (PublicationObjSpec *) lfirst(cell);
     192             : 
     193        1684 :         switch (pubobj->pubobjtype)
     194             :         {
     195        1328 :             case PUBLICATIONOBJ_TABLE:
     196        1328 :                 *rels = lappend(*rels, pubobj->pubtable);
     197        1328 :                 break;
     198         332 :             case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
     199         332 :                 schemaid = get_namespace_oid(pubobj->name, false);
     200             : 
     201             :                 /* Filter out duplicates if user specifies "sch1, sch1" */
     202         302 :                 *schemas = list_append_unique_oid(*schemas, schemaid);
     203         302 :                 break;
     204          24 :             case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
     205          24 :                 search_path = fetch_search_path(false);
     206          24 :                 if (search_path == NIL) /* nothing valid in search_path? */
     207           6 :                     ereport(ERROR,
     208             :                             errcode(ERRCODE_UNDEFINED_SCHEMA),
     209             :                             errmsg("no schema has been selected for CURRENT_SCHEMA"));
     210             : 
     211          18 :                 schemaid = linitial_oid(search_path);
     212          18 :                 list_free(search_path);
     213             : 
     214             :                 /* Filter out duplicates if user specifies "sch1, sch1" */
     215          18 :                 *schemas = list_append_unique_oid(*schemas, schemaid);
     216          18 :                 break;
     217           0 :             default:
     218             :                 /* shouldn't happen */
     219           0 :                 elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
     220             :                 break;
     221             :         }
     222             :     }
     223             : }
     224             : 
     225             : /*
     226             :  * Returns true if any of the columns used in the row filter WHERE expression is
     227             :  * not part of REPLICA IDENTITY, false otherwise.
     228             :  */
     229             : static bool
     230         258 : contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
     231             : {
     232         258 :     if (node == NULL)
     233           0 :         return false;
     234             : 
     235         258 :     if (IsA(node, Var))
     236             :     {
     237         104 :         Var        *var = (Var *) node;
     238         104 :         AttrNumber  attnum = var->varattno;
     239             : 
     240             :         /*
     241             :          * If pubviaroot is true, we are validating the row filter of the
     242             :          * parent table, but the bitmap contains the replica identity
     243             :          * information of the child table. So, get the column number of the
     244             :          * child table as parent and child column order could be different.
     245             :          */
     246         104 :         if (context->pubviaroot)
     247             :         {
     248          16 :             char       *colname = get_attname(context->parentid, attnum, false);
     249             : 
     250          16 :             attnum = get_attnum(context->relid, colname);
     251             :         }
     252             : 
     253         104 :         if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
     254         104 :                            context->bms_replident))
     255          60 :             return true;
     256             :     }
     257             : 
     258         198 :     return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
     259             :                                   context);
     260             : }
     261             : 
     262             : /*
     263             :  * Check if all columns referenced in the filter expression are part of the
     264             :  * REPLICA IDENTITY index or not.
     265             :  *
     266             :  * Returns true if any invalid column is found.
     267             :  */
     268             : bool
     269         698 : pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
     270             :                                bool pubviaroot)
     271             : {
     272             :     HeapTuple   rftuple;
     273         698 :     Oid         relid = RelationGetRelid(relation);
     274         698 :     Oid         publish_as_relid = RelationGetRelid(relation);
     275         698 :     bool        result = false;
     276             :     Datum       rfdatum;
     277             :     bool        rfisnull;
     278             : 
     279             :     /*
     280             :      * FULL means all columns are in the REPLICA IDENTITY, so all columns are
     281             :      * allowed in the row filter and we can skip the validation.
     282             :      */
     283         698 :     if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     284         174 :         return false;
     285             : 
     286             :     /*
     287             :      * For a partition, if pubviaroot is true, find the topmost ancestor that
     288             :      * is published via this publication as we need to use its row filter
     289             :      * expression to filter the partition's changes.
     290             :      *
     291             :      * Note that even though the row filter used is for an ancestor, the
     292             :      * REPLICA IDENTITY used will be for the actual child table.
     293             :      */
     294         524 :     if (pubviaroot && relation->rd_rel->relispartition)
     295             :     {
     296             :         publish_as_relid
     297         108 :             = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
     298             : 
     299         108 :         if (!OidIsValid(publish_as_relid))
     300           6 :             publish_as_relid = relid;
     301             :     }
     302             : 
     303         524 :     rftuple = SearchSysCache2(PUBLICATIONRELMAP,
     304             :                               ObjectIdGetDatum(publish_as_relid),
     305             :                               ObjectIdGetDatum(pubid));
     306             : 
     307         524 :     if (!HeapTupleIsValid(rftuple))
     308          56 :         return false;
     309             : 
     310         468 :     rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
     311             :                               Anum_pg_publication_rel_prqual,
     312             :                               &rfisnull);
     313             : 
     314         468 :     if (!rfisnull)
     315             :     {
     316         102 :         rf_context  context = {0};
     317             :         Node       *rfnode;
     318         102 :         Bitmapset  *bms = NULL;
     319             : 
     320         102 :         context.pubviaroot = pubviaroot;
     321         102 :         context.parentid = publish_as_relid;
     322         102 :         context.relid = relid;
     323             : 
     324             :         /* Remember columns that are part of the REPLICA IDENTITY */
     325         102 :         bms = RelationGetIndexAttrBitmap(relation,
     326             :                                          INDEX_ATTR_BITMAP_IDENTITY_KEY);
     327             : 
     328         102 :         context.bms_replident = bms;
     329         102 :         rfnode = stringToNode(TextDatumGetCString(rfdatum));
     330         102 :         result = contain_invalid_rfcolumn_walker(rfnode, &context);
     331             :     }
     332             : 
     333         468 :     ReleaseSysCache(rftuple);
     334             : 
     335         468 :     return result;
     336             : }
     337             : 
     338             : /*
     339             :  * Check for invalid columns in the publication table definition.
     340             :  *
     341             :  * This function evaluates two conditions:
     342             :  *
     343             :  * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
     344             :  *    by the column list. If any column is missing, *invalid_column_list is set
     345             :  *    to true.
     346             :  * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY
     347             :  *    are published either by listing them in the column list or by enabling
     348             :  *    publish_generated_columns option. If any unpublished generated column is
     349             :  *    found, *invalid_gen_col is set to true.
     350             :  *
     351             :  * Returns true if any of the above conditions are not met.
     352             :  */
     353             : bool
     354         890 : pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
     355             :                             bool pubviaroot, bool pubgencols,
     356             :                             bool *invalid_column_list,
     357             :                             bool *invalid_gen_col)
     358             : {
     359         890 :     Oid         relid = RelationGetRelid(relation);
     360         890 :     Oid         publish_as_relid = RelationGetRelid(relation);
     361             :     Bitmapset  *idattrs;
     362         890 :     Bitmapset  *columns = NULL;
     363         890 :     TupleDesc   desc = RelationGetDescr(relation);
     364             :     Publication *pub;
     365             :     int         x;
     366             : 
     367         890 :     *invalid_column_list = false;
     368         890 :     *invalid_gen_col = false;
     369             : 
     370             :     /*
     371             :      * For a partition, if pubviaroot is true, find the topmost ancestor that
     372             :      * is published via this publication as we need to use its column list for
     373             :      * the changes.
     374             :      *
     375             :      * Note that even though the column list used is for an ancestor, the
     376             :      * REPLICA IDENTITY used will be for the actual child table.
     377             :      */
     378         890 :     if (pubviaroot && relation->rd_rel->relispartition)
     379             :     {
     380         160 :         publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
     381             : 
     382         160 :         if (!OidIsValid(publish_as_relid))
     383          36 :             publish_as_relid = relid;
     384             :     }
     385             : 
     386             :     /* Fetch the column list */
     387         890 :     pub = GetPublication(pubid);
     388         890 :     check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns);
     389             : 
     390         890 :     if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     391             :     {
     392             :         /* With REPLICA IDENTITY FULL, no column list is allowed. */
     393         188 :         *invalid_column_list = (columns != NULL);
     394             : 
     395             :         /*
     396             :          * As we don't allow a column list with REPLICA IDENTITY FULL, the
     397             :          * publish_generated_columns option must be set to true if the table
     398             :          * has any stored generated columns.
     399             :          */
     400         188 :         if (!pubgencols &&
     401         182 :             relation->rd_att->constr &&
     402          70 :             relation->rd_att->constr->has_generated_stored)
     403           6 :             *invalid_gen_col = true;
     404             : 
     405         188 :         if (*invalid_gen_col && *invalid_column_list)
     406           0 :             return true;
     407             :     }
     408             : 
     409             :     /* Remember columns that are part of the REPLICA IDENTITY */
     410         890 :     idattrs = RelationGetIndexAttrBitmap(relation,
     411             :                                          INDEX_ATTR_BITMAP_IDENTITY_KEY);
     412             : 
     413             :     /*
     414             :      * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
     415             :      * (to handle system columns the usual way), while column list does not
     416             :      * use offset, so we can't do bms_is_subset(). Instead, we have to loop
     417             :      * over the idattrs and check all of them are in the list.
     418             :      */
     419         890 :     x = -1;
     420        1536 :     while ((x = bms_next_member(idattrs, x)) >= 0)
     421             :     {
     422         652 :         AttrNumber  attnum = (x + FirstLowInvalidHeapAttributeNumber);
     423         652 :         Form_pg_attribute att = TupleDescAttr(desc, attnum - 1);
     424             : 
     425         652 :         if (columns == NULL)
     426             :         {
     427             :             /*
     428             :              * The publish_generated_columns option must be set to true if the
     429             :              * REPLICA IDENTITY contains any stored generated column.
     430             :              */
     431         442 :             if (!pubgencols && att->attgenerated)
     432             :             {
     433           6 :                 *invalid_gen_col = true;
     434           6 :                 break;
     435             :             }
     436             : 
     437             :             /* Skip validating the column list since it is not defined */
     438         436 :             continue;
     439             :         }
     440             : 
     441             :         /*
     442             :          * If pubviaroot is true, we are validating the column list of the
     443             :          * parent table, but the bitmap contains the replica identity
     444             :          * information of the child table. The parent/child attnums may not
     445             :          * match, so translate them to the parent - get the attname from the
     446             :          * child, and look it up in the parent.
     447             :          */
     448         210 :         if (pubviaroot)
     449             :         {
     450             :             /* attribute name in the child table */
     451          80 :             char       *colname = get_attname(relid, attnum, false);
     452             : 
     453             :             /*
     454             :              * Determine the attnum for the attribute name in parent (we are
     455             :              * using the column list defined on the parent).
     456             :              */
     457          80 :             attnum = get_attnum(publish_as_relid, colname);
     458             :         }
     459             : 
     460             :         /* replica identity column, not covered by the column list */
     461         210 :         *invalid_column_list |= !bms_is_member(attnum, columns);
     462             : 
     463         210 :         if (*invalid_column_list && *invalid_gen_col)
     464           0 :             break;
     465             :     }
     466             : 
     467         890 :     bms_free(columns);
     468         890 :     bms_free(idattrs);
     469             : 
     470         890 :     return *invalid_column_list || *invalid_gen_col;
     471             : }
     472             : 
     473             : /* check_functions_in_node callback */
     474             : static bool
     475         396 : contain_mutable_or_user_functions_checker(Oid func_id, void *context)
     476             : {
     477         396 :     return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
     478             :             func_id >= FirstNormalObjectId);
     479             : }
     480             : 
     481             : /*
     482             :  * The row filter walker checks if the row filter expression is a "simple
     483             :  * expression".
     484             :  *
     485             :  * It allows only simple or compound expressions such as:
     486             :  * - (Var Op Const)
     487             :  * - (Var Op Var)
     488             :  * - (Var Op Const) AND/OR (Var Op Const)
     489             :  * - etc
     490             :  * (where Var is a column of the table this filter belongs to)
     491             :  *
     492             :  * The simple expression has the following restrictions:
     493             :  * - User-defined operators are not allowed;
     494             :  * - User-defined functions are not allowed;
     495             :  * - User-defined types are not allowed;
     496             :  * - User-defined collations are not allowed;
     497             :  * - Non-immutable built-in functions are not allowed;
     498             :  * - System columns are not allowed.
     499             :  *
     500             :  * NOTES
     501             :  *
     502             :  * We don't allow user-defined functions/operators/types/collations because
     503             :  * (a) if a user drops a user-defined object used in a row filter expression or
     504             :  * if there is any other error while using it, the logical decoding
     505             :  * infrastructure won't be able to recover from such an error even if the
     506             :  * object is recreated again because a historic snapshot is used to evaluate
     507             :  * the row filter;
     508             :  * (b) a user-defined function can be used to access tables that could have
     509             :  * unpleasant results because a historic snapshot is used. That's why only
     510             :  * immutable built-in functions are allowed in row filter expressions.
     511             :  *
     512             :  * We don't allow system columns because currently, we don't have that
     513             :  * information in the tuple passed to downstream. Also, as we don't replicate
     514             :  * those to subscribers, there doesn't seem to be a need for a filter on those
     515             :  * columns.
     516             :  *
     517             :  * We can allow other node types after more analysis and testing.
     518             :  */
     519             : static bool
     520        1328 : check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
     521             : {
     522        1328 :     char       *errdetail_msg = NULL;
     523             : 
     524        1328 :     if (node == NULL)
     525           6 :         return false;
     526             : 
     527        1322 :     switch (nodeTag(node))
     528             :     {
     529         356 :         case T_Var:
     530             :             /* System columns are not allowed. */
     531         356 :             if (((Var *) node)->varattno < InvalidAttrNumber)
     532           6 :                 errdetail_msg = _("System columns are not allowed.");
     533         356 :             break;
     534         352 :         case T_OpExpr:
     535             :         case T_DistinctExpr:
     536             :         case T_NullIfExpr:
     537             :             /* OK, except user-defined operators are not allowed. */
     538         352 :             if (((OpExpr *) node)->opno >= FirstNormalObjectId)
     539           6 :                 errdetail_msg = _("User-defined operators are not allowed.");
     540         352 :             break;
     541           6 :         case T_ScalarArrayOpExpr:
     542             :             /* OK, except user-defined operators are not allowed. */
     543           6 :             if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
     544           0 :                 errdetail_msg = _("User-defined operators are not allowed.");
     545             : 
     546             :             /*
     547             :              * We don't need to check the hashfuncid and negfuncid of
     548             :              * ScalarArrayOpExpr as those functions are only built for a
     549             :              * subquery.
     550             :              */
     551           6 :             break;
     552           6 :         case T_RowCompareExpr:
     553             :             {
     554             :                 ListCell   *opid;
     555             : 
     556             :                 /* OK, except user-defined operators are not allowed. */
     557          18 :                 foreach(opid, ((RowCompareExpr *) node)->opnos)
     558             :                 {
     559          12 :                     if (lfirst_oid(opid) >= FirstNormalObjectId)
     560             :                     {
     561           0 :                         errdetail_msg = _("User-defined operators are not allowed.");
     562           0 :                         break;
     563             :                     }
     564             :                 }
     565             :             }
     566           6 :             break;
     567         596 :         case T_Const:
     568             :         case T_FuncExpr:
     569             :         case T_BoolExpr:
     570             :         case T_RelabelType:
     571             :         case T_CollateExpr:
     572             :         case T_CaseExpr:
     573             :         case T_CaseTestExpr:
     574             :         case T_ArrayExpr:
     575             :         case T_RowExpr:
     576             :         case T_CoalesceExpr:
     577             :         case T_MinMaxExpr:
     578             :         case T_XmlExpr:
     579             :         case T_NullTest:
     580             :         case T_BooleanTest:
     581             :         case T_List:
     582             :             /* OK, supported */
     583         596 :             break;
     584           6 :         default:
     585           6 :             errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
     586           6 :             break;
     587             :     }
     588             : 
     589             :     /*
     590             :      * For all the supported nodes, if we haven't already found a problem,
     591             :      * check the types, functions, and collations used in it.  We check List
     592             :      * by walking through each element.
     593             :      */
     594        1322 :     if (!errdetail_msg && !IsA(node, List))
     595             :     {
     596        1250 :         if (exprType(node) >= FirstNormalObjectId)
     597           6 :             errdetail_msg = _("User-defined types are not allowed.");
     598        1244 :         else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
     599             :                                          pstate))
     600          12 :             errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
     601        2464 :         else if (exprCollation(node) >= FirstNormalObjectId ||
     602        1232 :                  exprInputCollation(node) >= FirstNormalObjectId)
     603           6 :             errdetail_msg = _("User-defined collations are not allowed.");
     604             :     }
     605             : 
     606             :     /*
     607             :      * If we found a problem in this node, throw error now. Otherwise keep
     608             :      * going.
     609             :      */
     610        1322 :     if (errdetail_msg)
     611          42 :         ereport(ERROR,
     612             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     613             :                  errmsg("invalid publication WHERE expression"),
     614             :                  errdetail_internal("%s", errdetail_msg),
     615             :                  parser_errposition(pstate, exprLocation(node))));
     616             : 
     617        1280 :     return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
     618             :                                   pstate);
     619             : }
     620             : 
     621             : /*
     622             :  * Check if the row filter expression is a "simple expression".
     623             :  *
     624             :  * See check_simple_rowfilter_expr_walker for details.
     625             :  */
     626             : static bool
     627         344 : check_simple_rowfilter_expr(Node *node, ParseState *pstate)
     628             : {
     629         344 :     return check_simple_rowfilter_expr_walker(node, pstate);
     630             : }
     631             : 
     632             : /*
     633             :  * Transform the publication WHERE expression for all the relations in the list,
     634             :  * ensuring it is coerced to boolean and necessary collation information is
     635             :  * added if required, and add a new nsitem/RTE for the associated relation to
     636             :  * the ParseState's namespace list.
     637             :  *
     638             :  * Also check the publication row filter expression and throw an error if
     639             :  * anything not permitted or unexpected is encountered.
     640             :  */
     641             : static void
     642        1106 : TransformPubWhereClauses(List *tables, const char *queryString,
     643             :                          bool pubviaroot)
     644             : {
     645             :     ListCell   *lc;
     646             : 
     647        2212 :     foreach(lc, tables)
     648             :     {
     649             :         ParseNamespaceItem *nsitem;
     650        1166 :         Node       *whereclause = NULL;
     651             :         ParseState *pstate;
     652        1166 :         PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
     653             : 
     654        1166 :         if (pri->whereClause == NULL)
     655         804 :             continue;
     656             : 
     657             :         /*
     658             :          * If the publication doesn't publish changes via the root partitioned
     659             :          * table, the partition's row filter will be used. So disallow using
     660             :          * WHERE clause on partitioned table in this case.
     661             :          */
     662         362 :         if (!pubviaroot &&
     663         340 :             pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     664           6 :             ereport(ERROR,
     665             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     666             :                      errmsg("cannot use publication WHERE clause for relation \"%s\"",
     667             :                             RelationGetRelationName(pri->relation)),
     668             :                      errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
     669             :                                "publish_via_partition_root")));
     670             : 
     671             :         /*
     672             :          * A fresh pstate is required so that we only have "this" table in its
     673             :          * rangetable
     674             :          */
     675         356 :         pstate = make_parsestate(NULL);
     676         356 :         pstate->p_sourcetext = queryString;
     677         356 :         nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
     678             :                                                AccessShareLock, NULL,
     679             :                                                false, false);
     680         356 :         addNSItemToQuery(pstate, nsitem, false, true, true);
     681             : 
     682         356 :         whereclause = transformWhereClause(pstate,
     683         356 :                                            copyObject(pri->whereClause),
     684             :                                            EXPR_KIND_WHERE,
     685             :                                            "PUBLICATION WHERE");
     686             : 
     687             :         /* Fix up collation information */
     688         344 :         assign_expr_collations(pstate, whereclause);
     689             : 
     690             :         /*
     691             :          * We allow only simple expressions in row filters. See
     692             :          * check_simple_rowfilter_expr_walker.
     693             :          */
     694         344 :         check_simple_rowfilter_expr(whereclause, pstate);
     695             : 
     696         302 :         free_parsestate(pstate);
     697             : 
     698         302 :         pri->whereClause = whereclause;
     699             :     }
     700        1046 : }
     701             : 
     702             : 
     703             : /*
     704             :  * Given a list of tables that are going to be added to a publication,
     705             :  * verify that they fulfill the necessary preconditions, namely: no tables
     706             :  * have a column list if any schema is published; and partitioned tables do
     707             :  * not have column lists if publish_via_partition_root is not set.
     708             :  *
     709             :  * 'publish_schema' indicates that the publication contains any TABLES IN
     710             :  * SCHEMA elements (newly added in this command, or preexisting).
     711             :  * 'pubviaroot' is the value of publish_via_partition_root.
     712             :  */
     713             : static void
     714        1046 : CheckPubRelationColumnList(char *pubname, List *tables,
     715             :                            bool publish_schema, bool pubviaroot)
     716             : {
     717             :     ListCell   *lc;
     718             : 
     719        2122 :     foreach(lc, tables)
     720             :     {
     721        1106 :         PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
     722             : 
     723        1106 :         if (pri->columns == NIL)
     724         732 :             continue;
     725             : 
     726             :         /*
     727             :          * Disallow specifying column list if any schema is in the
     728             :          * publication.
     729             :          *
     730             :          * XXX We could instead just forbid the case when the publication
     731             :          * tries to publish the table with a column list and a schema for that
     732             :          * table. However, if we do that then we need a restriction during
     733             :          * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
     734             :          * seem to be a good idea.
     735             :          */
     736         374 :         if (publish_schema)
     737          24 :             ereport(ERROR,
     738             :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     739             :                     errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
     740             :                            get_namespace_name(RelationGetNamespace(pri->relation)),
     741             :                            RelationGetRelationName(pri->relation), pubname),
     742             :                     errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
     743             : 
     744             :         /*
     745             :          * If the publication doesn't publish changes via the root partitioned
     746             :          * table, the partition's column list will be used. So disallow using
     747             :          * a column list on the partitioned table in this case.
     748             :          */
     749         350 :         if (!pubviaroot &&
     750         274 :             pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     751           6 :             ereport(ERROR,
     752             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     753             :                      errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
     754             :                             get_namespace_name(RelationGetNamespace(pri->relation)),
     755             :                             RelationGetRelationName(pri->relation), pubname),
     756             :                      errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
     757             :                                "publish_via_partition_root")));
     758             :     }
     759        1016 : }
     760             : 
     761             : /*
     762             :  * Create new publication.
     763             :  */
     764             : ObjectAddress
     765         752 : CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
     766             : {
     767             :     Relation    rel;
     768             :     ObjectAddress myself;
     769             :     Oid         puboid;
     770             :     bool        nulls[Natts_pg_publication];
     771             :     Datum       values[Natts_pg_publication];
     772             :     HeapTuple   tup;
     773             :     bool        publish_given;
     774             :     PublicationActions pubactions;
     775             :     bool        publish_via_partition_root_given;
     776             :     bool        publish_via_partition_root;
     777             :     bool        publish_generated_columns_given;
     778             :     bool        publish_generated_columns;
     779             :     AclResult   aclresult;
     780         752 :     List       *relations = NIL;
     781         752 :     List       *schemaidlist = NIL;
     782             : 
     783             :     /* must have CREATE privilege on database */
     784         752 :     aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
     785         752 :     if (aclresult != ACLCHECK_OK)
     786           6 :         aclcheck_error(aclresult, OBJECT_DATABASE,
     787           6 :                        get_database_name(MyDatabaseId));
     788             : 
     789             :     /* FOR ALL TABLES requires superuser */
     790         746 :     if (stmt->for_all_tables && !superuser())
     791           0 :         ereport(ERROR,
     792             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     793             :                  errmsg("must be superuser to create FOR ALL TABLES publication")));
     794             : 
     795         746 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
     796             : 
     797             :     /* Check if name is used */
     798         746 :     puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
     799             :                              CStringGetDatum(stmt->pubname));
     800         746 :     if (OidIsValid(puboid))
     801           6 :         ereport(ERROR,
     802             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     803             :                  errmsg("publication \"%s\" already exists",
     804             :                         stmt->pubname)));
     805             : 
     806             :     /* Form a tuple. */
     807         740 :     memset(values, 0, sizeof(values));
     808         740 :     memset(nulls, false, sizeof(nulls));
     809             : 
     810         740 :     values[Anum_pg_publication_pubname - 1] =
     811         740 :         DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
     812         740 :     values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
     813             : 
     814         740 :     parse_publication_options(pstate,
     815             :                               stmt->options,
     816             :                               &publish_given, &pubactions,
     817             :                               &publish_via_partition_root_given,
     818             :                               &publish_via_partition_root,
     819             :                               &publish_generated_columns_given,
     820             :                               &publish_generated_columns);
     821             : 
     822         710 :     puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
     823             :                                 Anum_pg_publication_oid);
     824         710 :     values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
     825         710 :     values[Anum_pg_publication_puballtables - 1] =
     826         710 :         BoolGetDatum(stmt->for_all_tables);
     827         710 :     values[Anum_pg_publication_pubinsert - 1] =
     828         710 :         BoolGetDatum(pubactions.pubinsert);
     829         710 :     values[Anum_pg_publication_pubupdate - 1] =
     830         710 :         BoolGetDatum(pubactions.pubupdate);
     831         710 :     values[Anum_pg_publication_pubdelete - 1] =
     832         710 :         BoolGetDatum(pubactions.pubdelete);
     833         710 :     values[Anum_pg_publication_pubtruncate - 1] =
     834         710 :         BoolGetDatum(pubactions.pubtruncate);
     835         710 :     values[Anum_pg_publication_pubviaroot - 1] =
     836         710 :         BoolGetDatum(publish_via_partition_root);
     837         710 :     values[Anum_pg_publication_pubgencols - 1] =
     838         710 :         BoolGetDatum(publish_generated_columns);
     839             : 
     840         710 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     841             : 
     842             :     /* Insert tuple into catalog. */
     843         710 :     CatalogTupleInsert(rel, tup);
     844         710 :     heap_freetuple(tup);
     845             : 
     846         710 :     recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
     847             : 
     848         710 :     ObjectAddressSet(myself, PublicationRelationId, puboid);
     849             : 
     850             :     /* Make the changes visible. */
     851         710 :     CommandCounterIncrement();
     852             : 
     853             :     /* Associate objects with the publication. */
     854         710 :     if (stmt->for_all_tables)
     855             :     {
     856             :         /* Invalidate relcache so that publication info is rebuilt. */
     857          82 :         CacheInvalidateRelcacheAll();
     858             :     }
     859             :     else
     860             :     {
     861         628 :         ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
     862             :                                    &schemaidlist);
     863             : 
     864             :         /* FOR TABLES IN SCHEMA requires superuser */
     865         610 :         if (schemaidlist != NIL && !superuser())
     866           6 :             ereport(ERROR,
     867             :                     errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     868             :                     errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
     869             : 
     870         604 :         if (relations != NIL)
     871             :         {
     872             :             List       *rels;
     873             : 
     874         410 :             rels = OpenTableList(relations);
     875         386 :             TransformPubWhereClauses(rels, pstate->p_sourcetext,
     876             :                                      publish_via_partition_root);
     877             : 
     878         362 :             CheckPubRelationColumnList(stmt->pubname, rels,
     879             :                                        schemaidlist != NIL,
     880             :                                        publish_via_partition_root);
     881             : 
     882         356 :             PublicationAddTables(puboid, rels, true, NULL);
     883         330 :             CloseTableList(rels);
     884             :         }
     885             : 
     886         524 :         if (schemaidlist != NIL)
     887             :         {
     888             :             /*
     889             :              * Schema lock is held until the publication is created to prevent
     890             :              * concurrent schema deletion.
     891             :              */
     892         134 :             LockSchemaList(schemaidlist);
     893         134 :             PublicationAddSchemas(puboid, schemaidlist, true, NULL);
     894             :         }
     895             :     }
     896             : 
     897         600 :     table_close(rel, RowExclusiveLock);
     898             : 
     899         600 :     InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
     900             : 
     901         600 :     if (wal_level != WAL_LEVEL_LOGICAL)
     902         344 :         ereport(WARNING,
     903             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     904             :                  errmsg("\"wal_level\" is insufficient to publish logical changes"),
     905             :                  errhint("Set \"wal_level\" to \"logical\" before creating subscriptions.")));
     906             : 
     907         600 :     return myself;
     908             : }
     909             : 
     910             : /*
     911             :  * Change options of a publication.
     912             :  */
     913             : static void
     914         116 : AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
     915             :                         Relation rel, HeapTuple tup)
     916             : {
     917             :     bool        nulls[Natts_pg_publication];
     918             :     bool        replaces[Natts_pg_publication];
     919             :     Datum       values[Natts_pg_publication];
     920             :     bool        publish_given;
     921             :     PublicationActions pubactions;
     922             :     bool        publish_via_partition_root_given;
     923             :     bool        publish_via_partition_root;
     924             :     bool        publish_generated_columns_given;
     925             :     bool        publish_generated_columns;
     926             :     ObjectAddress obj;
     927             :     Form_pg_publication pubform;
     928         116 :     List       *root_relids = NIL;
     929             :     ListCell   *lc;
     930             : 
     931         116 :     parse_publication_options(pstate,
     932             :                               stmt->options,
     933             :                               &publish_given, &pubactions,
     934             :                               &publish_via_partition_root_given,
     935             :                               &publish_via_partition_root,
     936             :                               &publish_generated_columns_given,
     937             :                               &publish_generated_columns);
     938             : 
     939         116 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
     940             : 
     941             :     /*
     942             :      * If the publication doesn't publish changes via the root partitioned
     943             :      * table, the partition's row filter and column list will be used. So
     944             :      * disallow using WHERE clause and column lists on partitioned table in
     945             :      * this case.
     946             :      */
     947         116 :     if (!pubform->puballtables && publish_via_partition_root_given &&
     948          80 :         !publish_via_partition_root)
     949             :     {
     950             :         /*
     951             :          * Lock the publication so nobody else can do anything with it. This
     952             :          * prevents concurrent alter to add partitioned table(s) with WHERE
     953             :          * clause(s) and/or column lists which we don't allow when not
     954             :          * publishing via root.
     955             :          */
     956          48 :         LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
     957             :                            AccessShareLock);
     958             : 
     959          48 :         root_relids = GetPublicationRelations(pubform->oid,
     960             :                                               PUBLICATION_PART_ROOT);
     961             : 
     962          84 :         foreach(lc, root_relids)
     963             :         {
     964          48 :             Oid         relid = lfirst_oid(lc);
     965             :             HeapTuple   rftuple;
     966             :             char        relkind;
     967             :             char       *relname;
     968             :             bool        has_rowfilter;
     969             :             bool        has_collist;
     970             : 
     971             :             /*
     972             :              * Beware: we don't have lock on the relations, so cope silently
     973             :              * with the cache lookups returning NULL.
     974             :              */
     975             : 
     976          48 :             rftuple = SearchSysCache2(PUBLICATIONRELMAP,
     977             :                                       ObjectIdGetDatum(relid),
     978             :                                       ObjectIdGetDatum(pubform->oid));
     979          48 :             if (!HeapTupleIsValid(rftuple))
     980           0 :                 continue;
     981          48 :             has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
     982          48 :             has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
     983          48 :             if (!has_rowfilter && !has_collist)
     984             :             {
     985          12 :                 ReleaseSysCache(rftuple);
     986          12 :                 continue;
     987             :             }
     988             : 
     989          36 :             relkind = get_rel_relkind(relid);
     990          36 :             if (relkind != RELKIND_PARTITIONED_TABLE)
     991             :             {
     992          24 :                 ReleaseSysCache(rftuple);
     993          24 :                 continue;
     994             :             }
     995          12 :             relname = get_rel_name(relid);
     996          12 :             if (relname == NULL)    /* table concurrently dropped */
     997             :             {
     998           0 :                 ReleaseSysCache(rftuple);
     999           0 :                 continue;
    1000             :             }
    1001             : 
    1002          12 :             if (has_rowfilter)
    1003           6 :                 ereport(ERROR,
    1004             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1005             :                          errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
    1006             :                                 "publish_via_partition_root",
    1007             :                                 stmt->pubname),
    1008             :                          errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
    1009             :                                    relname, "publish_via_partition_root")));
    1010             :             Assert(has_collist);
    1011           6 :             ereport(ERROR,
    1012             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1013             :                      errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
    1014             :                             "publish_via_partition_root",
    1015             :                             stmt->pubname),
    1016             :                      errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
    1017             :                                relname, "publish_via_partition_root")));
    1018             :         }
    1019             :     }
    1020             : 
    1021             :     /* Everything ok, form a new tuple. */
    1022         104 :     memset(values, 0, sizeof(values));
    1023         104 :     memset(nulls, false, sizeof(nulls));
    1024         104 :     memset(replaces, false, sizeof(replaces));
    1025             : 
    1026         104 :     if (publish_given)
    1027             :     {
    1028          28 :         values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
    1029          28 :         replaces[Anum_pg_publication_pubinsert - 1] = true;
    1030             : 
    1031          28 :         values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
    1032          28 :         replaces[Anum_pg_publication_pubupdate - 1] = true;
    1033             : 
    1034          28 :         values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
    1035          28 :         replaces[Anum_pg_publication_pubdelete - 1] = true;
    1036             : 
    1037          28 :         values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
    1038          28 :         replaces[Anum_pg_publication_pubtruncate - 1] = true;
    1039             :     }
    1040             : 
    1041         104 :     if (publish_via_partition_root_given)
    1042             :     {
    1043          70 :         values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
    1044          70 :         replaces[Anum_pg_publication_pubviaroot - 1] = true;
    1045             :     }
    1046             : 
    1047         104 :     if (publish_generated_columns_given)
    1048             :     {
    1049           6 :         values[Anum_pg_publication_pubgencols - 1] = BoolGetDatum(publish_generated_columns);
    1050           6 :         replaces[Anum_pg_publication_pubgencols - 1] = true;
    1051             :     }
    1052             : 
    1053         104 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
    1054             :                             replaces);
    1055             : 
    1056             :     /* Update the catalog. */
    1057         104 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    1058             : 
    1059         104 :     CommandCounterIncrement();
    1060             : 
    1061         104 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1062             : 
    1063             :     /* Invalidate the relcache. */
    1064         104 :     if (pubform->puballtables)
    1065             :     {
    1066           8 :         CacheInvalidateRelcacheAll();
    1067             :     }
    1068             :     else
    1069             :     {
    1070          96 :         List       *relids = NIL;
    1071          96 :         List       *schemarelids = NIL;
    1072             : 
    1073             :         /*
    1074             :          * For any partitioned tables contained in the publication, we must
    1075             :          * invalidate all partitions contained in the respective partition
    1076             :          * trees, not just those explicitly mentioned in the publication.
    1077             :          */
    1078          96 :         if (root_relids == NIL)
    1079          60 :             relids = GetPublicationRelations(pubform->oid,
    1080             :                                              PUBLICATION_PART_ALL);
    1081             :         else
    1082             :         {
    1083             :             /*
    1084             :              * We already got tables explicitly mentioned in the publication.
    1085             :              * Now get all partitions for the partitioned table in the list.
    1086             :              */
    1087          72 :             foreach(lc, root_relids)
    1088          36 :                 relids = GetPubPartitionOptionRelations(relids,
    1089             :                                                         PUBLICATION_PART_ALL,
    1090             :                                                         lfirst_oid(lc));
    1091             :         }
    1092             : 
    1093          96 :         schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
    1094             :                                                         PUBLICATION_PART_ALL);
    1095          96 :         relids = list_concat_unique_oid(relids, schemarelids);
    1096             : 
    1097          96 :         InvalidatePublicationRels(relids);
    1098             :     }
    1099             : 
    1100         104 :     ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
    1101         104 :     EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
    1102             :                                      (Node *) stmt);
    1103             : 
    1104         104 :     InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
    1105         104 : }
    1106             : 
    1107             : /*
    1108             :  * Invalidate the relations.
    1109             :  */
    1110             : void
    1111        2282 : InvalidatePublicationRels(List *relids)
    1112             : {
    1113             :     /*
    1114             :      * We don't want to send too many individual messages, at some point it's
    1115             :      * cheaper to just reset whole relcache.
    1116             :      */
    1117        2282 :     if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
    1118             :     {
    1119             :         ListCell   *lc;
    1120             : 
    1121       19096 :         foreach(lc, relids)
    1122       16814 :             CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
    1123             :     }
    1124             :     else
    1125           0 :         CacheInvalidateRelcacheAll();
    1126        2282 : }
    1127             : 
    1128             : /*
    1129             :  * Add or remove table to/from publication.
    1130             :  */
    1131             : static void
    1132         884 : AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
    1133             :                        List *tables, const char *queryString,
    1134             :                        bool publish_schema)
    1135             : {
    1136         884 :     List       *rels = NIL;
    1137         884 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
    1138         884 :     Oid         pubid = pubform->oid;
    1139             : 
    1140             :     /*
    1141             :      * Nothing to do if no objects, except in SET: for that it is quite
    1142             :      * possible that user has not specified any tables in which case we need
    1143             :      * to remove all the existing tables.
    1144             :      */
    1145         884 :     if (!tables && stmt->action != AP_SetObjects)
    1146          66 :         return;
    1147             : 
    1148         818 :     rels = OpenTableList(tables);
    1149             : 
    1150         818 :     if (stmt->action == AP_AddObjects)
    1151             :     {
    1152         274 :         TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
    1153             : 
    1154         256 :         publish_schema |= is_schema_publication(pubid);
    1155             : 
    1156         256 :         CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
    1157         256 :                                    pubform->pubviaroot);
    1158             : 
    1159         244 :         PublicationAddTables(pubid, rels, false, stmt);
    1160             :     }
    1161         544 :     else if (stmt->action == AP_DropObjects)
    1162          98 :         PublicationDropTables(pubid, rels, false);
    1163             :     else                        /* AP_SetObjects */
    1164             :     {
    1165         446 :         List       *oldrelids = GetPublicationRelations(pubid,
    1166             :                                                         PUBLICATION_PART_ROOT);
    1167         446 :         List       *delrels = NIL;
    1168             :         ListCell   *oldlc;
    1169             : 
    1170         446 :         TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
    1171             : 
    1172         428 :         CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
    1173         428 :                                    pubform->pubviaroot);
    1174             : 
    1175             :         /*
    1176             :          * To recreate the relation list for the publication, look for
    1177             :          * existing relations that do not need to be dropped.
    1178             :          */
    1179         806 :         foreach(oldlc, oldrelids)
    1180             :         {
    1181         402 :             Oid         oldrelid = lfirst_oid(oldlc);
    1182             :             ListCell   *newlc;
    1183             :             PublicationRelInfo *oldrel;
    1184         402 :             bool        found = false;
    1185             :             HeapTuple   rftuple;
    1186         402 :             Node       *oldrelwhereclause = NULL;
    1187         402 :             Bitmapset  *oldcolumns = NULL;
    1188             : 
    1189             :             /* look up the cache for the old relmap */
    1190         402 :             rftuple = SearchSysCache2(PUBLICATIONRELMAP,
    1191             :                                       ObjectIdGetDatum(oldrelid),
    1192             :                                       ObjectIdGetDatum(pubid));
    1193             : 
    1194             :             /*
    1195             :              * See if the existing relation currently has a WHERE clause or a
    1196             :              * column list. We need to compare those too.
    1197             :              */
    1198         402 :             if (HeapTupleIsValid(rftuple))
    1199             :             {
    1200         402 :                 bool        isnull = true;
    1201             :                 Datum       whereClauseDatum;
    1202             :                 Datum       columnListDatum;
    1203             : 
    1204             :                 /* Load the WHERE clause for this table. */
    1205         402 :                 whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
    1206             :                                                    Anum_pg_publication_rel_prqual,
    1207             :                                                    &isnull);
    1208         402 :                 if (!isnull)
    1209         210 :                     oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
    1210             : 
    1211             :                 /* Transform the int2vector column list to a bitmap. */
    1212         402 :                 columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
    1213             :                                                   Anum_pg_publication_rel_prattrs,
    1214             :                                                   &isnull);
    1215             : 
    1216         402 :                 if (!isnull)
    1217         136 :                     oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
    1218             : 
    1219         402 :                 ReleaseSysCache(rftuple);
    1220             :             }
    1221             : 
    1222         778 :             foreach(newlc, rels)
    1223             :             {
    1224             :                 PublicationRelInfo *newpubrel;
    1225             :                 Oid         newrelid;
    1226         404 :                 Bitmapset  *newcolumns = NULL;
    1227             : 
    1228         404 :                 newpubrel = (PublicationRelInfo *) lfirst(newlc);
    1229         404 :                 newrelid = RelationGetRelid(newpubrel->relation);
    1230             : 
    1231             :                 /*
    1232             :                  * Validate the column list.  If the column list or WHERE
    1233             :                  * clause changes, then the validation done here will be
    1234             :                  * duplicated inside PublicationAddTables().  The validation
    1235             :                  * is cheap enough that that seems harmless.
    1236             :                  */
    1237         404 :                 newcolumns = pub_collist_validate(newpubrel->relation,
    1238             :                                                   newpubrel->columns);
    1239             : 
    1240             :                 /*
    1241             :                  * Check if any of the new set of relations matches with the
    1242             :                  * existing relations in the publication. Additionally, if the
    1243             :                  * relation has an associated WHERE clause, check the WHERE
    1244             :                  * expressions also match. Same for the column list. Drop the
    1245             :                  * rest.
    1246             :                  */
    1247         392 :                 if (newrelid == oldrelid)
    1248             :                 {
    1249         284 :                     if (equal(oldrelwhereclause, newpubrel->whereClause) &&
    1250          80 :                         bms_equal(oldcolumns, newcolumns))
    1251             :                     {
    1252          16 :                         found = true;
    1253          16 :                         break;
    1254             :                     }
    1255             :                 }
    1256             :             }
    1257             : 
    1258             :             /*
    1259             :              * Add the non-matched relations to a list so that they can be
    1260             :              * dropped.
    1261             :              */
    1262         390 :             if (!found)
    1263             :             {
    1264         374 :                 oldrel = palloc(sizeof(PublicationRelInfo));
    1265         374 :                 oldrel->whereClause = NULL;
    1266         374 :                 oldrel->columns = NIL;
    1267         374 :                 oldrel->relation = table_open(oldrelid,
    1268             :                                               ShareUpdateExclusiveLock);
    1269         374 :                 delrels = lappend(delrels, oldrel);
    1270             :             }
    1271             :         }
    1272             : 
    1273             :         /* And drop them. */
    1274         404 :         PublicationDropTables(pubid, delrels, true);
    1275             : 
    1276             :         /*
    1277             :          * Don't bother calculating the difference for adding, we'll catch and
    1278             :          * skip existing ones when doing catalog update.
    1279             :          */
    1280         404 :         PublicationAddTables(pubid, rels, true, stmt);
    1281             : 
    1282         404 :         CloseTableList(delrels);
    1283             :     }
    1284             : 
    1285         692 :     CloseTableList(rels);
    1286             : }
    1287             : 
    1288             : /*
    1289             :  * Alter the publication schemas.
    1290             :  *
    1291             :  * Add or remove schemas to/from publication.
    1292             :  */
    1293             : static void
    1294         758 : AlterPublicationSchemas(AlterPublicationStmt *stmt,
    1295             :                         HeapTuple tup, List *schemaidlist)
    1296             : {
    1297         758 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
    1298             : 
    1299             :     /*
    1300             :      * Nothing to do if no objects, except in SET: for that it is quite
    1301             :      * possible that user has not specified any schemas in which case we need
    1302             :      * to remove all the existing schemas.
    1303             :      */
    1304         758 :     if (!schemaidlist && stmt->action != AP_SetObjects)
    1305         288 :         return;
    1306             : 
    1307             :     /*
    1308             :      * Schema lock is held until the publication is altered to prevent
    1309             :      * concurrent schema deletion.
    1310             :      */
    1311         470 :     LockSchemaList(schemaidlist);
    1312         470 :     if (stmt->action == AP_AddObjects)
    1313             :     {
    1314             :         ListCell   *lc;
    1315             :         List       *reloids;
    1316             : 
    1317          28 :         reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
    1318             : 
    1319          36 :         foreach(lc, reloids)
    1320             :         {
    1321             :             HeapTuple   coltuple;
    1322             : 
    1323          14 :             coltuple = SearchSysCache2(PUBLICATIONRELMAP,
    1324             :                                        ObjectIdGetDatum(lfirst_oid(lc)),
    1325             :                                        ObjectIdGetDatum(pubform->oid));
    1326             : 
    1327          14 :             if (!HeapTupleIsValid(coltuple))
    1328           0 :                 continue;
    1329             : 
    1330             :             /*
    1331             :              * Disallow adding schema if column list is already part of the
    1332             :              * publication. See CheckPubRelationColumnList.
    1333             :              */
    1334          14 :             if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
    1335           6 :                 ereport(ERROR,
    1336             :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1337             :                         errmsg("cannot add schema to publication \"%s\"",
    1338             :                                stmt->pubname),
    1339             :                         errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
    1340             : 
    1341           8 :             ReleaseSysCache(coltuple);
    1342             :         }
    1343             : 
    1344          22 :         PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
    1345             :     }
    1346         442 :     else if (stmt->action == AP_DropObjects)
    1347          38 :         PublicationDropSchemas(pubform->oid, schemaidlist, false);
    1348             :     else                        /* AP_SetObjects */
    1349             :     {
    1350         404 :         List       *oldschemaids = GetPublicationSchemas(pubform->oid);
    1351         404 :         List       *delschemas = NIL;
    1352             : 
    1353             :         /* Identify which schemas should be dropped */
    1354         404 :         delschemas = list_difference_oid(oldschemaids, schemaidlist);
    1355             : 
    1356             :         /*
    1357             :          * Schema lock is held until the publication is altered to prevent
    1358             :          * concurrent schema deletion.
    1359             :          */
    1360         404 :         LockSchemaList(delschemas);
    1361             : 
    1362             :         /* And drop them */
    1363         404 :         PublicationDropSchemas(pubform->oid, delschemas, true);
    1364             : 
    1365             :         /*
    1366             :          * Don't bother calculating the difference for adding, we'll catch and
    1367             :          * skip existing ones when doing catalog update.
    1368             :          */
    1369         404 :         PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
    1370             :     }
    1371             : }
    1372             : 
    1373             : /*
    1374             :  * Check if relations and schemas can be in a given publication and throw
    1375             :  * appropriate error if not.
    1376             :  */
    1377             : static void
    1378         926 : CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
    1379             :                       List *tables, List *schemaidlist)
    1380             : {
    1381         926 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
    1382             : 
    1383         926 :     if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
    1384          94 :         schemaidlist && !superuser())
    1385           6 :         ereport(ERROR,
    1386             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1387             :                  errmsg("must be superuser to add or set schemas")));
    1388             : 
    1389             :     /*
    1390             :      * Check that user is allowed to manipulate the publication tables in
    1391             :      * schema
    1392             :      */
    1393         920 :     if (schemaidlist && pubform->puballtables)
    1394          18 :         ereport(ERROR,
    1395             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1396             :                  errmsg("publication \"%s\" is defined as FOR ALL TABLES",
    1397             :                         NameStr(pubform->pubname)),
    1398             :                  errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications.")));
    1399             : 
    1400             :     /* Check that user is allowed to manipulate the publication tables. */
    1401         902 :     if (tables && pubform->puballtables)
    1402          18 :         ereport(ERROR,
    1403             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1404             :                  errmsg("publication \"%s\" is defined as FOR ALL TABLES",
    1405             :                         NameStr(pubform->pubname)),
    1406             :                  errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
    1407         884 : }
    1408             : 
    1409             : /*
    1410             :  * Alter the existing publication.
    1411             :  *
    1412             :  * This is dispatcher function for AlterPublicationOptions,
    1413             :  * AlterPublicationSchemas and AlterPublicationTables.
    1414             :  */
    1415             : void
    1416        1060 : AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
    1417             : {
    1418             :     Relation    rel;
    1419             :     HeapTuple   tup;
    1420             :     Form_pg_publication pubform;
    1421             : 
    1422        1060 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    1423             : 
    1424        1060 :     tup = SearchSysCacheCopy1(PUBLICATIONNAME,
    1425             :                               CStringGetDatum(stmt->pubname));
    1426             : 
    1427        1060 :     if (!HeapTupleIsValid(tup))
    1428           0 :         ereport(ERROR,
    1429             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    1430             :                  errmsg("publication \"%s\" does not exist",
    1431             :                         stmt->pubname)));
    1432             : 
    1433        1060 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1434             : 
    1435             :     /* must be owner */
    1436        1060 :     if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
    1437           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
    1438           0 :                        stmt->pubname);
    1439             : 
    1440        1060 :     if (stmt->options)
    1441         116 :         AlterPublicationOptions(pstate, stmt, rel, tup);
    1442             :     else
    1443             :     {
    1444         944 :         List       *relations = NIL;
    1445         944 :         List       *schemaidlist = NIL;
    1446         944 :         Oid         pubid = pubform->oid;
    1447             : 
    1448         944 :         ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
    1449             :                                    &schemaidlist);
    1450             : 
    1451         926 :         CheckAlterPublication(stmt, tup, relations, schemaidlist);
    1452             : 
    1453         884 :         heap_freetuple(tup);
    1454             : 
    1455             :         /* Lock the publication so nobody else can do anything with it. */
    1456         884 :         LockDatabaseObject(PublicationRelationId, pubid, 0,
    1457             :                            AccessExclusiveLock);
    1458             : 
    1459             :         /*
    1460             :          * It is possible that by the time we acquire the lock on publication,
    1461             :          * concurrent DDL has removed it. We can test this by checking the
    1462             :          * existence of publication. We get the tuple again to avoid the risk
    1463             :          * of any publication option getting changed.
    1464             :          */
    1465         884 :         tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    1466         884 :         if (!HeapTupleIsValid(tup))
    1467           0 :             ereport(ERROR,
    1468             :                     errcode(ERRCODE_UNDEFINED_OBJECT),
    1469             :                     errmsg("publication \"%s\" does not exist",
    1470             :                            stmt->pubname));
    1471             : 
    1472         884 :         AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
    1473             :                                schemaidlist != NIL);
    1474         758 :         AlterPublicationSchemas(stmt, tup, schemaidlist);
    1475             :     }
    1476             : 
    1477             :     /* Cleanup. */
    1478         844 :     heap_freetuple(tup);
    1479         844 :     table_close(rel, RowExclusiveLock);
    1480         844 : }
    1481             : 
    1482             : /*
    1483             :  * Remove relation from publication by mapping OID.
    1484             :  */
    1485             : void
    1486         810 : RemovePublicationRelById(Oid proid)
    1487             : {
    1488             :     Relation    rel;
    1489             :     HeapTuple   tup;
    1490             :     Form_pg_publication_rel pubrel;
    1491         810 :     List       *relids = NIL;
    1492             : 
    1493         810 :     rel = table_open(PublicationRelRelationId, RowExclusiveLock);
    1494             : 
    1495         810 :     tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
    1496             : 
    1497         810 :     if (!HeapTupleIsValid(tup))
    1498           0 :         elog(ERROR, "cache lookup failed for publication table %u",
    1499             :              proid);
    1500             : 
    1501         810 :     pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
    1502             : 
    1503             :     /*
    1504             :      * Invalidate relcache so that publication info is rebuilt.
    1505             :      *
    1506             :      * For the partitioned tables, we must invalidate all partitions contained
    1507             :      * in the respective partition hierarchies, not just the one explicitly
    1508             :      * mentioned in the publication. This is required because we implicitly
    1509             :      * publish the child tables when the parent table is published.
    1510             :      */
    1511         810 :     relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
    1512             :                                             pubrel->prrelid);
    1513             : 
    1514         810 :     InvalidatePublicationRels(relids);
    1515             : 
    1516         810 :     CatalogTupleDelete(rel, &tup->t_self);
    1517             : 
    1518         810 :     ReleaseSysCache(tup);
    1519             : 
    1520         810 :     table_close(rel, RowExclusiveLock);
    1521         810 : }
    1522             : 
    1523             : /*
    1524             :  * Remove the publication by mapping OID.
    1525             :  */
    1526             : void
    1527         424 : RemovePublicationById(Oid pubid)
    1528             : {
    1529             :     Relation    rel;
    1530             :     HeapTuple   tup;
    1531             :     Form_pg_publication pubform;
    1532             : 
    1533         424 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    1534             : 
    1535         424 :     tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    1536         424 :     if (!HeapTupleIsValid(tup))
    1537           0 :         elog(ERROR, "cache lookup failed for publication %u", pubid);
    1538             : 
    1539         424 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1540             : 
    1541             :     /* Invalidate relcache so that publication info is rebuilt. */
    1542         424 :     if (pubform->puballtables)
    1543          46 :         CacheInvalidateRelcacheAll();
    1544             : 
    1545         424 :     CatalogTupleDelete(rel, &tup->t_self);
    1546             : 
    1547         424 :     ReleaseSysCache(tup);
    1548             : 
    1549         424 :     table_close(rel, RowExclusiveLock);
    1550         424 : }
    1551             : 
    1552             : /*
    1553             :  * Remove schema from publication by mapping OID.
    1554             :  */
    1555             : void
    1556         192 : RemovePublicationSchemaById(Oid psoid)
    1557             : {
    1558             :     Relation    rel;
    1559             :     HeapTuple   tup;
    1560         192 :     List       *schemaRels = NIL;
    1561             :     Form_pg_publication_namespace pubsch;
    1562             : 
    1563         192 :     rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
    1564             : 
    1565         192 :     tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
    1566             : 
    1567         192 :     if (!HeapTupleIsValid(tup))
    1568           0 :         elog(ERROR, "cache lookup failed for publication schema %u", psoid);
    1569             : 
    1570         192 :     pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
    1571             : 
    1572             :     /*
    1573             :      * Invalidate relcache so that publication info is rebuilt. See
    1574             :      * RemovePublicationRelById for why we need to consider all the
    1575             :      * partitions.
    1576             :      */
    1577         192 :     schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
    1578             :                                                PUBLICATION_PART_ALL);
    1579         192 :     InvalidatePublicationRels(schemaRels);
    1580             : 
    1581         192 :     CatalogTupleDelete(rel, &tup->t_self);
    1582             : 
    1583         192 :     ReleaseSysCache(tup);
    1584             : 
    1585         192 :     table_close(rel, RowExclusiveLock);
    1586         192 : }
    1587             : 
    1588             : /*
    1589             :  * Open relations specified by a PublicationTable list.
    1590             :  * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
    1591             :  * add them to a publication.
    1592             :  */
    1593             : static List *
    1594        1228 : OpenTableList(List *tables)
    1595             : {
    1596        1228 :     List       *relids = NIL;
    1597        1228 :     List       *rels = NIL;
    1598             :     ListCell   *lc;
    1599        1228 :     List       *relids_with_rf = NIL;
    1600        1228 :     List       *relids_with_collist = NIL;
    1601             : 
    1602             :     /*
    1603             :      * Open, share-lock, and check all the explicitly-specified relations
    1604             :      */
    1605        2514 :     foreach(lc, tables)
    1606             :     {
    1607        1310 :         PublicationTable *t = lfirst_node(PublicationTable, lc);
    1608        1310 :         bool        recurse = t->relation->inh;
    1609             :         Relation    rel;
    1610             :         Oid         myrelid;
    1611             :         PublicationRelInfo *pub_rel;
    1612             : 
    1613             :         /* Allow query cancel in case this takes a long time */
    1614        1310 :         CHECK_FOR_INTERRUPTS();
    1615             : 
    1616        1310 :         rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
    1617        1310 :         myrelid = RelationGetRelid(rel);
    1618             : 
    1619             :         /*
    1620             :          * Filter out duplicates if user specifies "foo, foo".
    1621             :          *
    1622             :          * Note that this algorithm is known to not be very efficient (O(N^2))
    1623             :          * but given that it only works on list of tables given to us by user
    1624             :          * it's deemed acceptable.
    1625             :          */
    1626        1310 :         if (list_member_oid(relids, myrelid))
    1627             :         {
    1628             :             /* Disallow duplicate tables if there are any with row filters. */
    1629          24 :             if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
    1630          12 :                 ereport(ERROR,
    1631             :                         (errcode(ERRCODE_DUPLICATE_OBJECT),
    1632             :                          errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
    1633             :                                 RelationGetRelationName(rel))));
    1634             : 
    1635             :             /* Disallow duplicate tables if there are any with column lists. */
    1636          12 :             if (t->columns || list_member_oid(relids_with_collist, myrelid))
    1637          12 :                 ereport(ERROR,
    1638             :                         (errcode(ERRCODE_DUPLICATE_OBJECT),
    1639             :                          errmsg("conflicting or redundant column lists for table \"%s\"",
    1640             :                                 RelationGetRelationName(rel))));
    1641             : 
    1642           0 :             table_close(rel, ShareUpdateExclusiveLock);
    1643           0 :             continue;
    1644             :         }
    1645             : 
    1646        1286 :         pub_rel = palloc(sizeof(PublicationRelInfo));
    1647        1286 :         pub_rel->relation = rel;
    1648        1286 :         pub_rel->whereClause = t->whereClause;
    1649        1286 :         pub_rel->columns = t->columns;
    1650        1286 :         rels = lappend(rels, pub_rel);
    1651        1286 :         relids = lappend_oid(relids, myrelid);
    1652             : 
    1653        1286 :         if (t->whereClause)
    1654         372 :             relids_with_rf = lappend_oid(relids_with_rf, myrelid);
    1655             : 
    1656        1286 :         if (t->columns)
    1657         380 :             relids_with_collist = lappend_oid(relids_with_collist, myrelid);
    1658             : 
    1659             :         /*
    1660             :          * Add children of this rel, if requested, so that they too are added
    1661             :          * to the publication.  A partitioned table can't have any inheritance
    1662             :          * children other than its partitions, which need not be explicitly
    1663             :          * added to the publication.
    1664             :          */
    1665        1286 :         if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
    1666             :         {
    1667             :             List       *children;
    1668             :             ListCell   *child;
    1669             : 
    1670        1104 :             children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
    1671             :                                            NULL);
    1672             : 
    1673        2216 :             foreach(child, children)
    1674             :             {
    1675        1112 :                 Oid         childrelid = lfirst_oid(child);
    1676             : 
    1677             :                 /* Allow query cancel in case this takes a long time */
    1678        1112 :                 CHECK_FOR_INTERRUPTS();
    1679             : 
    1680             :                 /*
    1681             :                  * Skip duplicates if user specified both parent and child
    1682             :                  * tables.
    1683             :                  */
    1684        1112 :                 if (list_member_oid(relids, childrelid))
    1685             :                 {
    1686             :                     /*
    1687             :                      * We don't allow to specify row filter for both parent
    1688             :                      * and child table at the same time as it is not very
    1689             :                      * clear which one should be given preference.
    1690             :                      */
    1691        1104 :                     if (childrelid != myrelid &&
    1692           0 :                         (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
    1693           0 :                         ereport(ERROR,
    1694             :                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    1695             :                                  errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
    1696             :                                         RelationGetRelationName(rel))));
    1697             : 
    1698             :                     /*
    1699             :                      * We don't allow to specify column list for both parent
    1700             :                      * and child table at the same time as it is not very
    1701             :                      * clear which one should be given preference.
    1702             :                      */
    1703        1104 :                     if (childrelid != myrelid &&
    1704           0 :                         (t->columns || list_member_oid(relids_with_collist, childrelid)))
    1705           0 :                         ereport(ERROR,
    1706             :                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    1707             :                                  errmsg("conflicting or redundant column lists for table \"%s\"",
    1708             :                                         RelationGetRelationName(rel))));
    1709             : 
    1710        1104 :                     continue;
    1711             :                 }
    1712             : 
    1713             :                 /* find_all_inheritors already got lock */
    1714           8 :                 rel = table_open(childrelid, NoLock);
    1715           8 :                 pub_rel = palloc(sizeof(PublicationRelInfo));
    1716           8 :                 pub_rel->relation = rel;
    1717             :                 /* child inherits WHERE clause from parent */
    1718           8 :                 pub_rel->whereClause = t->whereClause;
    1719             : 
    1720             :                 /* child inherits column list from parent */
    1721           8 :                 pub_rel->columns = t->columns;
    1722           8 :                 rels = lappend(rels, pub_rel);
    1723           8 :                 relids = lappend_oid(relids, childrelid);
    1724             : 
    1725           8 :                 if (t->whereClause)
    1726           2 :                     relids_with_rf = lappend_oid(relids_with_rf, childrelid);
    1727             : 
    1728           8 :                 if (t->columns)
    1729           0 :                     relids_with_collist = lappend_oid(relids_with_collist, childrelid);
    1730             :             }
    1731             :         }
    1732             :     }
    1733             : 
    1734        1204 :     list_free(relids);
    1735        1204 :     list_free(relids_with_rf);
    1736             : 
    1737        1204 :     return rels;
    1738             : }
    1739             : 
    1740             : /*
    1741             :  * Close all relations in the list.
    1742             :  */
    1743             : static void
    1744        1426 : CloseTableList(List *rels)
    1745             : {
    1746             :     ListCell   *lc;
    1747             : 
    1748        2888 :     foreach(lc, rels)
    1749             :     {
    1750             :         PublicationRelInfo *pub_rel;
    1751             : 
    1752        1462 :         pub_rel = (PublicationRelInfo *) lfirst(lc);
    1753        1462 :         table_close(pub_rel->relation, NoLock);
    1754             :     }
    1755             : 
    1756        1426 :     list_free_deep(rels);
    1757        1426 : }
    1758             : 
    1759             : /*
    1760             :  * Lock the schemas specified in the schema list in AccessShareLock mode in
    1761             :  * order to prevent concurrent schema deletion.
    1762             :  */
    1763             : static void
    1764        1008 : LockSchemaList(List *schemalist)
    1765             : {
    1766             :     ListCell   *lc;
    1767             : 
    1768        1292 :     foreach(lc, schemalist)
    1769             :     {
    1770         284 :         Oid         schemaid = lfirst_oid(lc);
    1771             : 
    1772             :         /* Allow query cancel in case this takes a long time */
    1773         284 :         CHECK_FOR_INTERRUPTS();
    1774         284 :         LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
    1775             : 
    1776             :         /*
    1777             :          * It is possible that by the time we acquire the lock on schema,
    1778             :          * concurrent DDL has removed it. We can test this by checking the
    1779             :          * existence of schema.
    1780             :          */
    1781         284 :         if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
    1782           0 :             ereport(ERROR,
    1783             :                     errcode(ERRCODE_UNDEFINED_SCHEMA),
    1784             :                     errmsg("schema with OID %u does not exist", schemaid));
    1785             :     }
    1786        1008 : }
    1787             : 
    1788             : /*
    1789             :  * Add listed tables to the publication.
    1790             :  */
    1791             : static void
    1792        1004 : PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
    1793             :                      AlterPublicationStmt *stmt)
    1794             : {
    1795             :     ListCell   *lc;
    1796             : 
    1797             :     Assert(!stmt || !stmt->for_all_tables);
    1798             : 
    1799        2006 :     foreach(lc, rels)
    1800             :     {
    1801        1064 :         PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
    1802        1064 :         Relation    rel = pub_rel->relation;
    1803             :         ObjectAddress obj;
    1804             : 
    1805             :         /* Must be owner of the table or superuser. */
    1806        1064 :         if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
    1807           6 :             aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
    1808           6 :                            RelationGetRelationName(rel));
    1809             : 
    1810        1058 :         obj = publication_add_relation(pubid, pub_rel, if_not_exists);
    1811        1002 :         if (stmt)
    1812             :         {
    1813         612 :             EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
    1814             :                                              (Node *) stmt);
    1815             : 
    1816         612 :             InvokeObjectPostCreateHook(PublicationRelRelationId,
    1817             :                                        obj.objectId, 0);
    1818             :         }
    1819             :     }
    1820         942 : }
    1821             : 
    1822             : /*
    1823             :  * Remove listed tables from the publication.
    1824             :  */
    1825             : static void
    1826         502 : PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
    1827             : {
    1828             :     ObjectAddress obj;
    1829             :     ListCell   *lc;
    1830             :     Oid         prid;
    1831             : 
    1832         962 :     foreach(lc, rels)
    1833             :     {
    1834         478 :         PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
    1835         478 :         Relation    rel = pubrel->relation;
    1836         478 :         Oid         relid = RelationGetRelid(rel);
    1837             : 
    1838         478 :         if (pubrel->columns)
    1839           0 :             ereport(ERROR,
    1840             :                     errcode(ERRCODE_SYNTAX_ERROR),
    1841             :                     errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
    1842             : 
    1843         478 :         prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
    1844             :                                ObjectIdGetDatum(relid),
    1845             :                                ObjectIdGetDatum(pubid));
    1846         478 :         if (!OidIsValid(prid))
    1847             :         {
    1848          12 :             if (missing_ok)
    1849           0 :                 continue;
    1850             : 
    1851          12 :             ereport(ERROR,
    1852             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    1853             :                      errmsg("relation \"%s\" is not part of the publication",
    1854             :                             RelationGetRelationName(rel))));
    1855             :         }
    1856             : 
    1857         466 :         if (pubrel->whereClause)
    1858           6 :             ereport(ERROR,
    1859             :                     (errcode(ERRCODE_SYNTAX_ERROR),
    1860             :                      errmsg("cannot use a WHERE clause when removing a table from a publication")));
    1861             : 
    1862         460 :         ObjectAddressSet(obj, PublicationRelRelationId, prid);
    1863         460 :         performDeletion(&obj, DROP_CASCADE, 0);
    1864             :     }
    1865         484 : }
    1866             : 
    1867             : /*
    1868             :  * Add listed schemas to the publication.
    1869             :  */
    1870             : static void
    1871         560 : PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
    1872             :                       AlterPublicationStmt *stmt)
    1873             : {
    1874             :     ListCell   *lc;
    1875             : 
    1876             :     Assert(!stmt || !stmt->for_all_tables);
    1877             : 
    1878         770 :     foreach(lc, schemas)
    1879             :     {
    1880         222 :         Oid         schemaid = lfirst_oid(lc);
    1881             :         ObjectAddress obj;
    1882             : 
    1883         222 :         obj = publication_add_schema(pubid, schemaid, if_not_exists);
    1884         210 :         if (stmt)
    1885             :         {
    1886          58 :             EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
    1887             :                                              (Node *) stmt);
    1888             : 
    1889          58 :             InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
    1890             :                                        obj.objectId, 0);
    1891             :         }
    1892             :     }
    1893         548 : }
    1894             : 
    1895             : /*
    1896             :  * Remove listed schemas from the publication.
    1897             :  */
    1898             : static void
    1899         442 : PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
    1900             : {
    1901             :     ObjectAddress obj;
    1902             :     ListCell   *lc;
    1903             :     Oid         psid;
    1904             : 
    1905         492 :     foreach(lc, schemas)
    1906             :     {
    1907          56 :         Oid         schemaid = lfirst_oid(lc);
    1908             : 
    1909          56 :         psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
    1910             :                                Anum_pg_publication_namespace_oid,
    1911             :                                ObjectIdGetDatum(schemaid),
    1912             :                                ObjectIdGetDatum(pubid));
    1913          56 :         if (!OidIsValid(psid))
    1914             :         {
    1915           6 :             if (missing_ok)
    1916           0 :                 continue;
    1917             : 
    1918           6 :             ereport(ERROR,
    1919             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    1920             :                      errmsg("tables from schema \"%s\" are not part of the publication",
    1921             :                             get_namespace_name(schemaid))));
    1922             :         }
    1923             : 
    1924          50 :         ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
    1925          50 :         performDeletion(&obj, DROP_CASCADE, 0);
    1926             :     }
    1927         436 : }
    1928             : 
    1929             : /*
    1930             :  * Internal workhorse for changing a publication owner
    1931             :  */
    1932             : static void
    1933          26 : AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
    1934             : {
    1935             :     Form_pg_publication form;
    1936             : 
    1937          26 :     form = (Form_pg_publication) GETSTRUCT(tup);
    1938             : 
    1939          26 :     if (form->pubowner == newOwnerId)
    1940           2 :         return;
    1941             : 
    1942          24 :     if (!superuser())
    1943             :     {
    1944             :         AclResult   aclresult;
    1945             : 
    1946             :         /* Must be owner */
    1947          12 :         if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
    1948           0 :             aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
    1949           0 :                            NameStr(form->pubname));
    1950             : 
    1951             :         /* Must be able to become new owner */
    1952          12 :         check_can_set_role(GetUserId(), newOwnerId);
    1953             : 
    1954             :         /* New owner must have CREATE privilege on database */
    1955          12 :         aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
    1956          12 :         if (aclresult != ACLCHECK_OK)
    1957           0 :             aclcheck_error(aclresult, OBJECT_DATABASE,
    1958           0 :                            get_database_name(MyDatabaseId));
    1959             : 
    1960          12 :         if (form->puballtables && !superuser_arg(newOwnerId))
    1961           0 :             ereport(ERROR,
    1962             :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1963             :                      errmsg("permission denied to change owner of publication \"%s\"",
    1964             :                             NameStr(form->pubname)),
    1965             :                      errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
    1966             : 
    1967          12 :         if (!superuser_arg(newOwnerId) && is_schema_publication(form->oid))
    1968           6 :             ereport(ERROR,
    1969             :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1970             :                      errmsg("permission denied to change owner of publication \"%s\"",
    1971             :                             NameStr(form->pubname)),
    1972             :                      errhint("The owner of a FOR TABLES IN SCHEMA publication must be a superuser.")));
    1973             :     }
    1974             : 
    1975          18 :     form->pubowner = newOwnerId;
    1976          18 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    1977             : 
    1978             :     /* Update owner dependency reference */
    1979          18 :     changeDependencyOnOwner(PublicationRelationId,
    1980             :                             form->oid,
    1981             :                             newOwnerId);
    1982             : 
    1983          18 :     InvokeObjectPostAlterHook(PublicationRelationId,
    1984             :                               form->oid, 0);
    1985             : }
    1986             : 
    1987             : /*
    1988             :  * Change publication owner -- by name
    1989             :  */
    1990             : ObjectAddress
    1991          26 : AlterPublicationOwner(const char *name, Oid newOwnerId)
    1992             : {
    1993             :     Oid         subid;
    1994             :     HeapTuple   tup;
    1995             :     Relation    rel;
    1996             :     ObjectAddress address;
    1997             :     Form_pg_publication pubform;
    1998             : 
    1999          26 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    2000             : 
    2001          26 :     tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
    2002             : 
    2003          26 :     if (!HeapTupleIsValid(tup))
    2004           0 :         ereport(ERROR,
    2005             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2006             :                  errmsg("publication \"%s\" does not exist", name)));
    2007             : 
    2008          26 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    2009          26 :     subid = pubform->oid;
    2010             : 
    2011          26 :     AlterPublicationOwner_internal(rel, tup, newOwnerId);
    2012             : 
    2013          20 :     ObjectAddressSet(address, PublicationRelationId, subid);
    2014             : 
    2015          20 :     heap_freetuple(tup);
    2016             : 
    2017          20 :     table_close(rel, RowExclusiveLock);
    2018             : 
    2019          20 :     return address;
    2020             : }
    2021             : 
    2022             : /*
    2023             :  * Change publication owner -- by OID
    2024             :  */
    2025             : void
    2026           0 : AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
    2027             : {
    2028             :     HeapTuple   tup;
    2029             :     Relation    rel;
    2030             : 
    2031           0 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    2032             : 
    2033           0 :     tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(subid));
    2034             : 
    2035           0 :     if (!HeapTupleIsValid(tup))
    2036           0 :         ereport(ERROR,
    2037             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2038             :                  errmsg("publication with OID %u does not exist", subid)));
    2039             : 
    2040           0 :     AlterPublicationOwner_internal(rel, tup, newOwnerId);
    2041             : 
    2042           0 :     heap_freetuple(tup);
    2043             : 
    2044           0 :     table_close(rel, RowExclusiveLock);
    2045           0 : }

Generated by: LCOV version 1.14