LCOV - code coverage report
Current view: top level - src/backend/commands - publicationcmds.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 630 681 92.5 %
Date: 2025-02-21 17:14:59 Functions: 30 31 96.8 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * publicationcmds.c
       4             :  *      publication manipulation
       5             :  *
       6             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *      src/backend/commands/publicationcmds.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/htup_details.h"
      18             : #include "access/table.h"
      19             : #include "access/xact.h"
      20             : #include "catalog/catalog.h"
      21             : #include "catalog/indexing.h"
      22             : #include "catalog/namespace.h"
      23             : #include "catalog/objectaccess.h"
      24             : #include "catalog/objectaddress.h"
      25             : #include "catalog/pg_database.h"
      26             : #include "catalog/pg_inherits.h"
      27             : #include "catalog/pg_namespace.h"
      28             : #include "catalog/pg_proc.h"
      29             : #include "catalog/pg_publication.h"
      30             : #include "catalog/pg_publication_namespace.h"
      31             : #include "catalog/pg_publication_rel.h"
      32             : #include "commands/dbcommands.h"
      33             : #include "commands/defrem.h"
      34             : #include "commands/event_trigger.h"
      35             : #include "commands/publicationcmds.h"
      36             : #include "miscadmin.h"
      37             : #include "nodes/nodeFuncs.h"
      38             : #include "parser/parse_clause.h"
      39             : #include "parser/parse_collate.h"
      40             : #include "parser/parse_relation.h"
      41             : #include "rewrite/rewriteHandler.h"
      42             : #include "storage/lmgr.h"
      43             : #include "utils/acl.h"
      44             : #include "utils/builtins.h"
      45             : #include "utils/inval.h"
      46             : #include "utils/lsyscache.h"
      47             : #include "utils/rel.h"
      48             : #include "utils/syscache.h"
      49             : #include "utils/varlena.h"
      50             : 
      51             : 
      52             : /*
      53             :  * Information used to validate the columns in the row filter expression. See
      54             :  * contain_invalid_rfcolumn_walker for details.
      55             :  */
      56             : typedef struct rf_context
      57             : {
      58             :     Bitmapset  *bms_replident;  /* bitset of replica identity columns */
      59             :     bool        pubviaroot;     /* true if we are validating the parent
      60             :                                  * relation's row filter */
      61             :     Oid         relid;          /* relid of the relation */
      62             :     Oid         parentid;       /* relid of the parent relation */
      63             : } rf_context;
      64             : 
      65             : static List *OpenTableList(List *tables);
      66             : static void CloseTableList(List *rels);
      67             : static void LockSchemaList(List *schemalist);
      68             : static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
      69             :                                  AlterPublicationStmt *stmt);
      70             : static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
      71             : static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
      72             :                                   AlterPublicationStmt *stmt);
      73             : static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
      74             : static char defGetGeneratedColsOption(DefElem *def);
      75             : 
      76             : 
      77             : static void
      78         894 : parse_publication_options(ParseState *pstate,
      79             :                           List *options,
      80             :                           bool *publish_given,
      81             :                           PublicationActions *pubactions,
      82             :                           bool *publish_via_partition_root_given,
      83             :                           bool *publish_via_partition_root,
      84             :                           bool *publish_generated_columns_given,
      85             :                           char *publish_generated_columns)
      86             : {
      87             :     ListCell   *lc;
      88             : 
      89         894 :     *publish_given = false;
      90         894 :     *publish_via_partition_root_given = false;
      91         894 :     *publish_generated_columns_given = false;
      92             : 
      93             :     /* defaults */
      94         894 :     pubactions->pubinsert = true;
      95         894 :     pubactions->pubupdate = true;
      96         894 :     pubactions->pubdelete = true;
      97         894 :     pubactions->pubtruncate = true;
      98         894 :     *publish_via_partition_root = false;
      99         894 :     *publish_generated_columns = PUBLISH_GENCOLS_NONE;
     100             : 
     101             :     /* Parse options */
     102        1232 :     foreach(lc, options)
     103             :     {
     104         368 :         DefElem    *defel = (DefElem *) lfirst(lc);
     105             : 
     106         368 :         if (strcmp(defel->defname, "publish") == 0)
     107             :         {
     108             :             char       *publish;
     109             :             List       *publish_list;
     110             :             ListCell   *lc2;
     111             : 
     112         122 :             if (*publish_given)
     113           0 :                 errorConflictingDefElem(defel, pstate);
     114             : 
     115             :             /*
     116             :              * If publish option was given only the explicitly listed actions
     117             :              * should be published.
     118             :              */
     119         122 :             pubactions->pubinsert = false;
     120         122 :             pubactions->pubupdate = false;
     121         122 :             pubactions->pubdelete = false;
     122         122 :             pubactions->pubtruncate = false;
     123             : 
     124         122 :             *publish_given = true;
     125         122 :             publish = defGetString(defel);
     126             : 
     127         122 :             if (!SplitIdentifierString(publish, ',', &publish_list))
     128           0 :                 ereport(ERROR,
     129             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     130             :                          errmsg("invalid list syntax in parameter \"%s\"",
     131             :                                 "publish")));
     132             : 
     133             :             /* Process the option list. */
     134         270 :             foreach(lc2, publish_list)
     135             :             {
     136         154 :                 char       *publish_opt = (char *) lfirst(lc2);
     137             : 
     138         154 :                 if (strcmp(publish_opt, "insert") == 0)
     139         108 :                     pubactions->pubinsert = true;
     140          46 :                 else if (strcmp(publish_opt, "update") == 0)
     141          20 :                     pubactions->pubupdate = true;
     142          26 :                 else if (strcmp(publish_opt, "delete") == 0)
     143          10 :                     pubactions->pubdelete = true;
     144          16 :                 else if (strcmp(publish_opt, "truncate") == 0)
     145          10 :                     pubactions->pubtruncate = true;
     146             :                 else
     147           6 :                     ereport(ERROR,
     148             :                             (errcode(ERRCODE_SYNTAX_ERROR),
     149             :                              errmsg("unrecognized value for publication option \"%s\": \"%s\"",
     150             :                                     "publish", publish_opt)));
     151             :             }
     152             :         }
     153         246 :         else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
     154             :         {
     155         164 :             if (*publish_via_partition_root_given)
     156           6 :                 errorConflictingDefElem(defel, pstate);
     157         158 :             *publish_via_partition_root_given = true;
     158         158 :             *publish_via_partition_root = defGetBoolean(defel);
     159             :         }
     160          82 :         else if (strcmp(defel->defname, "publish_generated_columns") == 0)
     161             :         {
     162          76 :             if (*publish_generated_columns_given)
     163           6 :                 errorConflictingDefElem(defel, pstate);
     164          70 :             *publish_generated_columns_given = true;
     165          70 :             *publish_generated_columns = defGetGeneratedColsOption(defel);
     166             :         }
     167             :         else
     168           6 :             ereport(ERROR,
     169             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     170             :                      errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
     171             :     }
     172         864 : }
     173             : 
     174             : /*
     175             :  * Convert the PublicationObjSpecType list into schema oid list and
     176             :  * PublicationTable list.
     177             :  */
     178             : static void
     179        1610 : ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
     180             :                            List **rels, List **schemas)
     181             : {
     182             :     ListCell   *cell;
     183             :     PublicationObjSpec *pubobj;
     184             : 
     185        1610 :     if (!pubobjspec_list)
     186          86 :         return;
     187             : 
     188        3210 :     foreach(cell, pubobjspec_list)
     189             :     {
     190             :         Oid         schemaid;
     191             :         List       *search_path;
     192             : 
     193        1722 :         pubobj = (PublicationObjSpec *) lfirst(cell);
     194             : 
     195        1722 :         switch (pubobj->pubobjtype)
     196             :         {
     197        1366 :             case PUBLICATIONOBJ_TABLE:
     198        1366 :                 *rels = lappend(*rels, pubobj->pubtable);
     199        1366 :                 break;
     200         332 :             case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
     201         332 :                 schemaid = get_namespace_oid(pubobj->name, false);
     202             : 
     203             :                 /* Filter out duplicates if user specifies "sch1, sch1" */
     204         302 :                 *schemas = list_append_unique_oid(*schemas, schemaid);
     205         302 :                 break;
     206          24 :             case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
     207          24 :                 search_path = fetch_search_path(false);
     208          24 :                 if (search_path == NIL) /* nothing valid in search_path? */
     209           6 :                     ereport(ERROR,
     210             :                             errcode(ERRCODE_UNDEFINED_SCHEMA),
     211             :                             errmsg("no schema has been selected for CURRENT_SCHEMA"));
     212             : 
     213          18 :                 schemaid = linitial_oid(search_path);
     214          18 :                 list_free(search_path);
     215             : 
     216             :                 /* Filter out duplicates if user specifies "sch1, sch1" */
     217          18 :                 *schemas = list_append_unique_oid(*schemas, schemaid);
     218          18 :                 break;
     219           0 :             default:
     220             :                 /* shouldn't happen */
     221           0 :                 elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
     222             :                 break;
     223             :         }
     224             :     }
     225             : }
     226             : 
     227             : /*
     228             :  * Returns true if any of the columns used in the row filter WHERE expression is
     229             :  * not part of REPLICA IDENTITY, false otherwise.
     230             :  */
     231             : static bool
     232         258 : contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
     233             : {
     234         258 :     if (node == NULL)
     235           0 :         return false;
     236             : 
     237         258 :     if (IsA(node, Var))
     238             :     {
     239         104 :         Var        *var = (Var *) node;
     240         104 :         AttrNumber  attnum = var->varattno;
     241             : 
     242             :         /*
     243             :          * If pubviaroot is true, we are validating the row filter of the
     244             :          * parent table, but the bitmap contains the replica identity
     245             :          * information of the child table. So, get the column number of the
     246             :          * child table as parent and child column order could be different.
     247             :          */
     248         104 :         if (context->pubviaroot)
     249             :         {
     250          16 :             char       *colname = get_attname(context->parentid, attnum, false);
     251             : 
     252          16 :             attnum = get_attnum(context->relid, colname);
     253             :         }
     254             : 
     255         104 :         if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
     256         104 :                            context->bms_replident))
     257          60 :             return true;
     258             :     }
     259             : 
     260         198 :     return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
     261             :                                   context);
     262             : }
     263             : 
     264             : /*
     265             :  * Check if all columns referenced in the filter expression are part of the
     266             :  * REPLICA IDENTITY index or not.
     267             :  *
     268             :  * Returns true if any invalid column is found.
     269             :  */
     270             : bool
     271         710 : pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
     272             :                                bool pubviaroot)
     273             : {
     274             :     HeapTuple   rftuple;
     275         710 :     Oid         relid = RelationGetRelid(relation);
     276         710 :     Oid         publish_as_relid = RelationGetRelid(relation);
     277         710 :     bool        result = false;
     278             :     Datum       rfdatum;
     279             :     bool        rfisnull;
     280             : 
     281             :     /*
     282             :      * FULL means all columns are in the REPLICA IDENTITY, so all columns are
     283             :      * allowed in the row filter and we can skip the validation.
     284             :      */
     285         710 :     if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     286         186 :         return false;
     287             : 
     288             :     /*
     289             :      * For a partition, if pubviaroot is true, find the topmost ancestor that
     290             :      * is published via this publication as we need to use its row filter
     291             :      * expression to filter the partition's changes.
     292             :      *
     293             :      * Note that even though the row filter used is for an ancestor, the
     294             :      * REPLICA IDENTITY used will be for the actual child table.
     295             :      */
     296         524 :     if (pubviaroot && relation->rd_rel->relispartition)
     297             :     {
     298             :         publish_as_relid
     299         108 :             = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
     300             : 
     301         108 :         if (!OidIsValid(publish_as_relid))
     302           6 :             publish_as_relid = relid;
     303             :     }
     304             : 
     305         524 :     rftuple = SearchSysCache2(PUBLICATIONRELMAP,
     306             :                               ObjectIdGetDatum(publish_as_relid),
     307             :                               ObjectIdGetDatum(pubid));
     308             : 
     309         524 :     if (!HeapTupleIsValid(rftuple))
     310          56 :         return false;
     311             : 
     312         468 :     rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
     313             :                               Anum_pg_publication_rel_prqual,
     314             :                               &rfisnull);
     315             : 
     316         468 :     if (!rfisnull)
     317             :     {
     318         102 :         rf_context  context = {0};
     319             :         Node       *rfnode;
     320         102 :         Bitmapset  *bms = NULL;
     321             : 
     322         102 :         context.pubviaroot = pubviaroot;
     323         102 :         context.parentid = publish_as_relid;
     324         102 :         context.relid = relid;
     325             : 
     326             :         /* Remember columns that are part of the REPLICA IDENTITY */
     327         102 :         bms = RelationGetIndexAttrBitmap(relation,
     328             :                                          INDEX_ATTR_BITMAP_IDENTITY_KEY);
     329             : 
     330         102 :         context.bms_replident = bms;
     331         102 :         rfnode = stringToNode(TextDatumGetCString(rfdatum));
     332         102 :         result = contain_invalid_rfcolumn_walker(rfnode, &context);
     333             :     }
     334             : 
     335         468 :     ReleaseSysCache(rftuple);
     336             : 
     337         468 :     return result;
     338             : }
     339             : 
     340             : /*
     341             :  * Check for invalid columns in the publication table definition.
     342             :  *
     343             :  * This function evaluates two conditions:
     344             :  *
     345             :  * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
     346             :  *    by the column list. If any column is missing, *invalid_column_list is set
     347             :  *    to true.
     348             :  * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY
     349             :  *    are published, either by being explicitly named in the column list or, if
     350             :  *    no column list is specified, by setting the option
     351             :  *    publish_generated_columns to stored. If any unpublished
     352             :  *    generated column is found, *invalid_gen_col is set to true.
     353             :  *
     354             :  * Returns true if any of the above conditions are not met.
     355             :  */
     356             : bool
     357         902 : pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
     358             :                             bool pubviaroot, char pubgencols_type,
     359             :                             bool *invalid_column_list,
     360             :                             bool *invalid_gen_col)
     361             : {
     362         902 :     Oid         relid = RelationGetRelid(relation);
     363         902 :     Oid         publish_as_relid = RelationGetRelid(relation);
     364             :     Bitmapset  *idattrs;
     365         902 :     Bitmapset  *columns = NULL;
     366         902 :     TupleDesc   desc = RelationGetDescr(relation);
     367             :     Publication *pub;
     368             :     int         x;
     369             : 
     370         902 :     *invalid_column_list = false;
     371         902 :     *invalid_gen_col = false;
     372             : 
     373             :     /*
     374             :      * For a partition, if pubviaroot is true, find the topmost ancestor that
     375             :      * is published via this publication as we need to use its column list for
     376             :      * the changes.
     377             :      *
     378             :      * Note that even though the column list used is for an ancestor, the
     379             :      * REPLICA IDENTITY used will be for the actual child table.
     380             :      */
     381         902 :     if (pubviaroot && relation->rd_rel->relispartition)
     382             :     {
     383         160 :         publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
     384             : 
     385         160 :         if (!OidIsValid(publish_as_relid))
     386          36 :             publish_as_relid = relid;
     387             :     }
     388             : 
     389             :     /* Fetch the column list */
     390         902 :     pub = GetPublication(pubid);
     391         902 :     check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns);
     392             : 
     393         902 :     if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     394             :     {
     395             :         /* With REPLICA IDENTITY FULL, no column list is allowed. */
     396         200 :         *invalid_column_list = (columns != NULL);
     397             : 
     398             :         /*
     399             :          * As we don't allow a column list with REPLICA IDENTITY FULL, the
     400             :          * publish_generated_columns option must be set to stored if the table
     401             :          * has any stored generated columns.
     402             :          */
     403         200 :         if (pubgencols_type != PUBLISH_GENCOLS_STORED &&
     404         188 :             relation->rd_att->constr &&
     405          76 :             relation->rd_att->constr->has_generated_stored)
     406           6 :             *invalid_gen_col = true;
     407             : 
     408             :         /*
     409             :          * Virtual generated columns are currently not supported for logical
     410             :          * replication at all.
     411             :          */
     412         200 :         if (relation->rd_att->constr &&
     413          88 :             relation->rd_att->constr->has_generated_virtual)
     414          12 :             *invalid_gen_col = true;
     415             : 
     416         200 :         if (*invalid_gen_col && *invalid_column_list)
     417           0 :             return true;
     418             :     }
     419             : 
     420             :     /* Remember columns that are part of the REPLICA IDENTITY */
     421         902 :     idattrs = RelationGetIndexAttrBitmap(relation,
     422             :                                          INDEX_ATTR_BITMAP_IDENTITY_KEY);
     423             : 
     424             :     /*
     425             :      * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
     426             :      * (to handle system columns the usual way), while column list does not
     427             :      * use offset, so we can't do bms_is_subset(). Instead, we have to loop
     428             :      * over the idattrs and check all of them are in the list.
     429             :      */
     430         902 :     x = -1;
     431        1548 :     while ((x = bms_next_member(idattrs, x)) >= 0)
     432             :     {
     433         652 :         AttrNumber  attnum = (x + FirstLowInvalidHeapAttributeNumber);
     434         652 :         Form_pg_attribute att = TupleDescAttr(desc, attnum - 1);
     435             : 
     436         652 :         if (columns == NULL)
     437             :         {
     438             :             /*
     439             :              * The publish_generated_columns option must be set to stored if
     440             :              * the REPLICA IDENTITY contains any stored generated column.
     441             :              */
     442         442 :             if (att->attgenerated == ATTRIBUTE_GENERATED_STORED && pubgencols_type != PUBLISH_GENCOLS_STORED)
     443             :             {
     444           6 :                 *invalid_gen_col = true;
     445           6 :                 break;
     446             :             }
     447             : 
     448             :             /*
     449             :              * The equivalent setting for virtual generated columns does not
     450             :              * exist yet.
     451             :              */
     452         436 :             if (att->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
     453             :             {
     454           0 :                 *invalid_gen_col = true;
     455           0 :                 break;
     456             :             }
     457             : 
     458             :             /* Skip validating the column list since it is not defined */
     459         436 :             continue;
     460             :         }
     461             : 
     462             :         /*
     463             :          * If pubviaroot is true, we are validating the column list of the
     464             :          * parent table, but the bitmap contains the replica identity
     465             :          * information of the child table. The parent/child attnums may not
     466             :          * match, so translate them to the parent - get the attname from the
     467             :          * child, and look it up in the parent.
     468             :          */
     469         210 :         if (pubviaroot)
     470             :         {
     471             :             /* attribute name in the child table */
     472          80 :             char       *colname = get_attname(relid, attnum, false);
     473             : 
     474             :             /*
     475             :              * Determine the attnum for the attribute name in parent (we are
     476             :              * using the column list defined on the parent).
     477             :              */
     478          80 :             attnum = get_attnum(publish_as_relid, colname);
     479             :         }
     480             : 
     481             :         /* replica identity column, not covered by the column list */
     482         210 :         *invalid_column_list |= !bms_is_member(attnum, columns);
     483             : 
     484         210 :         if (*invalid_column_list && *invalid_gen_col)
     485           0 :             break;
     486             :     }
     487             : 
     488         902 :     bms_free(columns);
     489         902 :     bms_free(idattrs);
     490             : 
     491         902 :     return *invalid_column_list || *invalid_gen_col;
     492             : }
     493             : 
     494             : /* check_functions_in_node callback */
     495             : static bool
     496         430 : contain_mutable_or_user_functions_checker(Oid func_id, void *context)
     497             : {
     498         430 :     return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
     499             :             func_id >= FirstNormalObjectId);
     500             : }
     501             : 
     502             : /*
     503             :  * The row filter walker checks if the row filter expression is a "simple
     504             :  * expression".
     505             :  *
     506             :  * It allows only simple or compound expressions such as:
     507             :  * - (Var Op Const)
     508             :  * - (Var Op Var)
     509             :  * - (Var Op Const) AND/OR (Var Op Const)
     510             :  * - etc
     511             :  * (where Var is a column of the table this filter belongs to)
     512             :  *
     513             :  * The simple expression has the following restrictions:
     514             :  * - User-defined operators are not allowed;
     515             :  * - User-defined functions are not allowed;
     516             :  * - User-defined types are not allowed;
     517             :  * - User-defined collations are not allowed;
     518             :  * - Non-immutable built-in functions are not allowed;
     519             :  * - System columns are not allowed.
     520             :  *
     521             :  * NOTES
     522             :  *
     523             :  * We don't allow user-defined functions/operators/types/collations because
     524             :  * (a) if a user drops a user-defined object used in a row filter expression or
     525             :  * if there is any other error while using it, the logical decoding
     526             :  * infrastructure won't be able to recover from such an error even if the
     527             :  * object is recreated again because a historic snapshot is used to evaluate
     528             :  * the row filter;
     529             :  * (b) a user-defined function can be used to access tables that could have
     530             :  * unpleasant results because a historic snapshot is used. That's why only
     531             :  * immutable built-in functions are allowed in row filter expressions.
     532             :  *
     533             :  * We don't allow system columns because currently, we don't have that
     534             :  * information in the tuple passed to downstream. Also, as we don't replicate
     535             :  * those to subscribers, there doesn't seem to be a need for a filter on those
     536             :  * columns.
     537             :  *
     538             :  * We can allow other node types after more analysis and testing.
     539             :  */
     540             : static bool
     541        1392 : check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
     542             : {
     543        1392 :     char       *errdetail_msg = NULL;
     544             : 
     545        1392 :     if (node == NULL)
     546           6 :         return false;
     547             : 
     548        1386 :     switch (nodeTag(node))
     549             :     {
     550         370 :         case T_Var:
     551             :             /* System columns are not allowed. */
     552         370 :             if (((Var *) node)->varattno < InvalidAttrNumber)
     553           6 :                 errdetail_msg = _("System columns are not allowed.");
     554         370 :             break;
     555         380 :         case T_OpExpr:
     556             :         case T_DistinctExpr:
     557             :         case T_NullIfExpr:
     558             :             /* OK, except user-defined operators are not allowed. */
     559         380 :             if (((OpExpr *) node)->opno >= FirstNormalObjectId)
     560           6 :                 errdetail_msg = _("User-defined operators are not allowed.");
     561         380 :             break;
     562           6 :         case T_ScalarArrayOpExpr:
     563             :             /* OK, except user-defined operators are not allowed. */
     564           6 :             if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
     565           0 :                 errdetail_msg = _("User-defined operators are not allowed.");
     566             : 
     567             :             /*
     568             :              * We don't need to check the hashfuncid and negfuncid of
     569             :              * ScalarArrayOpExpr as those functions are only built for a
     570             :              * subquery.
     571             :              */
     572           6 :             break;
     573           6 :         case T_RowCompareExpr:
     574             :             {
     575             :                 ListCell   *opid;
     576             : 
     577             :                 /* OK, except user-defined operators are not allowed. */
     578          18 :                 foreach(opid, ((RowCompareExpr *) node)->opnos)
     579             :                 {
     580          12 :                     if (lfirst_oid(opid) >= FirstNormalObjectId)
     581             :                     {
     582           0 :                         errdetail_msg = _("User-defined operators are not allowed.");
     583           0 :                         break;
     584             :                     }
     585             :                 }
     586             :             }
     587           6 :             break;
     588         618 :         case T_Const:
     589             :         case T_FuncExpr:
     590             :         case T_BoolExpr:
     591             :         case T_RelabelType:
     592             :         case T_CollateExpr:
     593             :         case T_CaseExpr:
     594             :         case T_CaseTestExpr:
     595             :         case T_ArrayExpr:
     596             :         case T_RowExpr:
     597             :         case T_CoalesceExpr:
     598             :         case T_MinMaxExpr:
     599             :         case T_XmlExpr:
     600             :         case T_NullTest:
     601             :         case T_BooleanTest:
     602             :         case T_List:
     603             :             /* OK, supported */
     604         618 :             break;
     605           6 :         default:
     606           6 :             errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
     607           6 :             break;
     608             :     }
     609             : 
     610             :     /*
     611             :      * For all the supported nodes, if we haven't already found a problem,
     612             :      * check the types, functions, and collations used in it.  We check List
     613             :      * by walking through each element.
     614             :      */
     615        1386 :     if (!errdetail_msg && !IsA(node, List))
     616             :     {
     617        1314 :         if (exprType(node) >= FirstNormalObjectId)
     618           6 :             errdetail_msg = _("User-defined types are not allowed.");
     619        1308 :         else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
     620             :                                          pstate))
     621          18 :             errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
     622        2580 :         else if (exprCollation(node) >= FirstNormalObjectId ||
     623        1290 :                  exprInputCollation(node) >= FirstNormalObjectId)
     624           6 :             errdetail_msg = _("User-defined collations are not allowed.");
     625             :     }
     626             : 
     627             :     /*
     628             :      * If we found a problem in this node, throw error now. Otherwise keep
     629             :      * going.
     630             :      */
     631        1386 :     if (errdetail_msg)
     632          48 :         ereport(ERROR,
     633             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     634             :                  errmsg("invalid publication WHERE expression"),
     635             :                  errdetail_internal("%s", errdetail_msg),
     636             :                  parser_errposition(pstate, exprLocation(node))));
     637             : 
     638        1338 :     return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
     639             :                                   pstate);
     640             : }
     641             : 
     642             : /*
     643             :  * Check if the row filter expression is a "simple expression".
     644             :  *
     645             :  * See check_simple_rowfilter_expr_walker for details.
     646             :  */
     647             : static bool
     648         358 : check_simple_rowfilter_expr(Node *node, ParseState *pstate)
     649             : {
     650         358 :     return check_simple_rowfilter_expr_walker(node, pstate);
     651             : }
     652             : 
     653             : /*
     654             :  * Transform the publication WHERE expression for all the relations in the list,
     655             :  * ensuring it is coerced to boolean and necessary collation information is
     656             :  * added if required, and add a new nsitem/RTE for the associated relation to
     657             :  * the ParseState's namespace list.
     658             :  *
     659             :  * Also check the publication row filter expression and throw an error if
     660             :  * anything not permitted or unexpected is encountered.
     661             :  */
     662             : static void
     663        1144 : TransformPubWhereClauses(List *tables, const char *queryString,
     664             :                          bool pubviaroot)
     665             : {
     666             :     ListCell   *lc;
     667             : 
     668        2282 :     foreach(lc, tables)
     669             :     {
     670             :         ParseNamespaceItem *nsitem;
     671        1204 :         Node       *whereclause = NULL;
     672             :         ParseState *pstate;
     673        1204 :         PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
     674             : 
     675        1204 :         if (pri->whereClause == NULL)
     676         828 :             continue;
     677             : 
     678             :         /*
     679             :          * If the publication doesn't publish changes via the root partitioned
     680             :          * table, the partition's row filter will be used. So disallow using
     681             :          * WHERE clause on partitioned table in this case.
     682             :          */
     683         376 :         if (!pubviaroot &&
     684         354 :             pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     685           6 :             ereport(ERROR,
     686             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     687             :                      errmsg("cannot use publication WHERE clause for relation \"%s\"",
     688             :                             RelationGetRelationName(pri->relation)),
     689             :                      errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
     690             :                                "publish_via_partition_root")));
     691             : 
     692             :         /*
     693             :          * A fresh pstate is required so that we only have "this" table in its
     694             :          * rangetable
     695             :          */
     696         370 :         pstate = make_parsestate(NULL);
     697         370 :         pstate->p_sourcetext = queryString;
     698         370 :         nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
     699             :                                                AccessShareLock, NULL,
     700             :                                                false, false);
     701         370 :         addNSItemToQuery(pstate, nsitem, false, true, true);
     702             : 
     703         370 :         whereclause = transformWhereClause(pstate,
     704         370 :                                            copyObject(pri->whereClause),
     705             :                                            EXPR_KIND_WHERE,
     706             :                                            "PUBLICATION WHERE");
     707             : 
     708             :         /* Fix up collation information */
     709         358 :         assign_expr_collations(pstate, whereclause);
     710             : 
     711         358 :         whereclause = expand_generated_columns_in_expr(whereclause, pri->relation, 1);
     712             : 
     713             :         /*
     714             :          * We allow only simple expressions in row filters. See
     715             :          * check_simple_rowfilter_expr_walker.
     716             :          */
     717         358 :         check_simple_rowfilter_expr(whereclause, pstate);
     718             : 
     719         310 :         free_parsestate(pstate);
     720             : 
     721         310 :         pri->whereClause = whereclause;
     722             :     }
     723        1078 : }
     724             : 
     725             : 
     726             : /*
     727             :  * Given a list of tables that are going to be added to a publication,
     728             :  * verify that they fulfill the necessary preconditions, namely: no tables
     729             :  * have a column list if any schema is published; and partitioned tables do
     730             :  * not have column lists if publish_via_partition_root is not set.
     731             :  *
     732             :  * 'publish_schema' indicates that the publication contains any TABLES IN
     733             :  * SCHEMA elements (newly added in this command, or preexisting).
     734             :  * 'pubviaroot' is the value of publish_via_partition_root.
     735             :  */
     736             : static void
     737        1078 : CheckPubRelationColumnList(char *pubname, List *tables,
     738             :                            bool publish_schema, bool pubviaroot)
     739             : {
     740             :     ListCell   *lc;
     741             : 
     742        2186 :     foreach(lc, tables)
     743             :     {
     744        1138 :         PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
     745             : 
     746        1138 :         if (pri->columns == NIL)
     747         758 :             continue;
     748             : 
     749             :         /*
     750             :          * Disallow specifying column list if any schema is in the
     751             :          * publication.
     752             :          *
     753             :          * XXX We could instead just forbid the case when the publication
     754             :          * tries to publish the table with a column list and a schema for that
     755             :          * table. However, if we do that then we need a restriction during
     756             :          * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
     757             :          * seem to be a good idea.
     758             :          */
     759         380 :         if (publish_schema)
     760          24 :             ereport(ERROR,
     761             :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     762             :                     errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
     763             :                            get_namespace_name(RelationGetNamespace(pri->relation)),
     764             :                            RelationGetRelationName(pri->relation), pubname),
     765             :                     errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
     766             : 
     767             :         /*
     768             :          * If the publication doesn't publish changes via the root partitioned
     769             :          * table, the partition's column list will be used. So disallow using
     770             :          * a column list on the partitioned table in this case.
     771             :          */
     772         356 :         if (!pubviaroot &&
     773         280 :             pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     774           6 :             ereport(ERROR,
     775             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     776             :                      errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
     777             :                             get_namespace_name(RelationGetNamespace(pri->relation)),
     778             :                             RelationGetRelationName(pri->relation), pubname),
     779             :                      errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
     780             :                                "publish_via_partition_root")));
     781             :     }
     782        1048 : }
     783             : 
     784             : /*
     785             :  * Create new publication.
     786             :  */
     787             : ObjectAddress
     788         790 : CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
     789             : {
     790             :     Relation    rel;
     791             :     ObjectAddress myself;
     792             :     Oid         puboid;
     793             :     bool        nulls[Natts_pg_publication];
     794             :     Datum       values[Natts_pg_publication];
     795             :     HeapTuple   tup;
     796             :     bool        publish_given;
     797             :     PublicationActions pubactions;
     798             :     bool        publish_via_partition_root_given;
     799             :     bool        publish_via_partition_root;
     800             :     bool        publish_generated_columns_given;
     801             :     char        publish_generated_columns;
     802             :     AclResult   aclresult;
     803         790 :     List       *relations = NIL;
     804         790 :     List       *schemaidlist = NIL;
     805             : 
     806             :     /* must have CREATE privilege on database */
     807         790 :     aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
     808         790 :     if (aclresult != ACLCHECK_OK)
     809           6 :         aclcheck_error(aclresult, OBJECT_DATABASE,
     810           6 :                        get_database_name(MyDatabaseId));
     811             : 
     812             :     /* FOR ALL TABLES requires superuser */
     813         784 :     if (stmt->for_all_tables && !superuser())
     814           0 :         ereport(ERROR,
     815             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     816             :                  errmsg("must be superuser to create FOR ALL TABLES publication")));
     817             : 
     818         784 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
     819             : 
     820             :     /* Check if name is used */
     821         784 :     puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
     822             :                              CStringGetDatum(stmt->pubname));
     823         784 :     if (OidIsValid(puboid))
     824           6 :         ereport(ERROR,
     825             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     826             :                  errmsg("publication \"%s\" already exists",
     827             :                         stmt->pubname)));
     828             : 
     829             :     /* Form a tuple. */
     830         778 :     memset(values, 0, sizeof(values));
     831         778 :     memset(nulls, false, sizeof(nulls));
     832             : 
     833         778 :     values[Anum_pg_publication_pubname - 1] =
     834         778 :         DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
     835         778 :     values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
     836             : 
     837         778 :     parse_publication_options(pstate,
     838             :                               stmt->options,
     839             :                               &publish_given, &pubactions,
     840             :                               &publish_via_partition_root_given,
     841             :                               &publish_via_partition_root,
     842             :                               &publish_generated_columns_given,
     843             :                               &publish_generated_columns);
     844             : 
     845         748 :     puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
     846             :                                 Anum_pg_publication_oid);
     847         748 :     values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
     848         748 :     values[Anum_pg_publication_puballtables - 1] =
     849         748 :         BoolGetDatum(stmt->for_all_tables);
     850         748 :     values[Anum_pg_publication_pubinsert - 1] =
     851         748 :         BoolGetDatum(pubactions.pubinsert);
     852         748 :     values[Anum_pg_publication_pubupdate - 1] =
     853         748 :         BoolGetDatum(pubactions.pubupdate);
     854         748 :     values[Anum_pg_publication_pubdelete - 1] =
     855         748 :         BoolGetDatum(pubactions.pubdelete);
     856         748 :     values[Anum_pg_publication_pubtruncate - 1] =
     857         748 :         BoolGetDatum(pubactions.pubtruncate);
     858         748 :     values[Anum_pg_publication_pubviaroot - 1] =
     859         748 :         BoolGetDatum(publish_via_partition_root);
     860         748 :     values[Anum_pg_publication_pubgencols - 1] =
     861         748 :         CharGetDatum(publish_generated_columns);
     862             : 
     863         748 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     864             : 
     865             :     /* Insert tuple into catalog. */
     866         748 :     CatalogTupleInsert(rel, tup);
     867         748 :     heap_freetuple(tup);
     868             : 
     869         748 :     recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
     870             : 
     871         748 :     ObjectAddressSet(myself, PublicationRelationId, puboid);
     872             : 
     873             :     /* Make the changes visible. */
     874         748 :     CommandCounterIncrement();
     875             : 
     876             :     /* Associate objects with the publication. */
     877         748 :     if (stmt->for_all_tables)
     878             :     {
     879             :         /* Invalidate relcache so that publication info is rebuilt. */
     880          88 :         CacheInvalidateRelcacheAll();
     881             :     }
     882             :     else
     883             :     {
     884         660 :         ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
     885             :                                    &schemaidlist);
     886             : 
     887             :         /* FOR TABLES IN SCHEMA requires superuser */
     888         642 :         if (schemaidlist != NIL && !superuser())
     889           6 :             ereport(ERROR,
     890             :                     errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     891             :                     errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
     892             : 
     893         636 :         if (relations != NIL)
     894             :         {
     895             :             List       *rels;
     896             : 
     897         442 :             rels = OpenTableList(relations);
     898         418 :             TransformPubWhereClauses(rels, pstate->p_sourcetext,
     899             :                                      publish_via_partition_root);
     900             : 
     901         388 :             CheckPubRelationColumnList(stmt->pubname, rels,
     902             :                                        schemaidlist != NIL,
     903             :                                        publish_via_partition_root);
     904             : 
     905         382 :             PublicationAddTables(puboid, rels, true, NULL);
     906         356 :             CloseTableList(rels);
     907             :         }
     908             : 
     909         550 :         if (schemaidlist != NIL)
     910             :         {
     911             :             /*
     912             :              * Schema lock is held until the publication is created to prevent
     913             :              * concurrent schema deletion.
     914             :              */
     915         134 :             LockSchemaList(schemaidlist);
     916         134 :             PublicationAddSchemas(puboid, schemaidlist, true, NULL);
     917             :         }
     918             :     }
     919             : 
     920         632 :     table_close(rel, RowExclusiveLock);
     921             : 
     922         632 :     InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
     923             : 
     924         632 :     if (wal_level != WAL_LEVEL_LOGICAL)
     925         368 :         ereport(WARNING,
     926             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     927             :                  errmsg("\"wal_level\" is insufficient to publish logical changes"),
     928             :                  errhint("Set \"wal_level\" to \"logical\" before creating subscriptions.")));
     929             : 
     930         632 :     return myself;
     931             : }
     932             : 
     933             : /*
     934             :  * Change options of a publication.
     935             :  */
     936             : static void
     937         116 : AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
     938             :                         Relation rel, HeapTuple tup)
     939             : {
     940             :     bool        nulls[Natts_pg_publication];
     941             :     bool        replaces[Natts_pg_publication];
     942             :     Datum       values[Natts_pg_publication];
     943             :     bool        publish_given;
     944             :     PublicationActions pubactions;
     945             :     bool        publish_via_partition_root_given;
     946             :     bool        publish_via_partition_root;
     947             :     bool        publish_generated_columns_given;
     948             :     char        publish_generated_columns;
     949             :     ObjectAddress obj;
     950             :     Form_pg_publication pubform;
     951         116 :     List       *root_relids = NIL;
     952             :     ListCell   *lc;
     953             : 
     954         116 :     parse_publication_options(pstate,
     955             :                               stmt->options,
     956             :                               &publish_given, &pubactions,
     957             :                               &publish_via_partition_root_given,
     958             :                               &publish_via_partition_root,
     959             :                               &publish_generated_columns_given,
     960             :                               &publish_generated_columns);
     961             : 
     962         116 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
     963             : 
     964             :     /*
     965             :      * If the publication doesn't publish changes via the root partitioned
     966             :      * table, the partition's row filter and column list will be used. So
     967             :      * disallow using WHERE clause and column lists on partitioned table in
     968             :      * this case.
     969             :      */
     970         116 :     if (!pubform->puballtables && publish_via_partition_root_given &&
     971          80 :         !publish_via_partition_root)
     972             :     {
     973             :         /*
     974             :          * Lock the publication so nobody else can do anything with it. This
     975             :          * prevents concurrent alter to add partitioned table(s) with WHERE
     976             :          * clause(s) and/or column lists which we don't allow when not
     977             :          * publishing via root.
     978             :          */
     979          48 :         LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
     980             :                            AccessShareLock);
     981             : 
     982          48 :         root_relids = GetPublicationRelations(pubform->oid,
     983             :                                               PUBLICATION_PART_ROOT);
     984             : 
     985          84 :         foreach(lc, root_relids)
     986             :         {
     987          48 :             Oid         relid = lfirst_oid(lc);
     988             :             HeapTuple   rftuple;
     989             :             char        relkind;
     990             :             char       *relname;
     991             :             bool        has_rowfilter;
     992             :             bool        has_collist;
     993             : 
     994             :             /*
     995             :              * Beware: we don't have lock on the relations, so cope silently
     996             :              * with the cache lookups returning NULL.
     997             :              */
     998             : 
     999          48 :             rftuple = SearchSysCache2(PUBLICATIONRELMAP,
    1000             :                                       ObjectIdGetDatum(relid),
    1001             :                                       ObjectIdGetDatum(pubform->oid));
    1002          48 :             if (!HeapTupleIsValid(rftuple))
    1003           0 :                 continue;
    1004          48 :             has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
    1005          48 :             has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
    1006          48 :             if (!has_rowfilter && !has_collist)
    1007             :             {
    1008          12 :                 ReleaseSysCache(rftuple);
    1009          12 :                 continue;
    1010             :             }
    1011             : 
    1012          36 :             relkind = get_rel_relkind(relid);
    1013          36 :             if (relkind != RELKIND_PARTITIONED_TABLE)
    1014             :             {
    1015          24 :                 ReleaseSysCache(rftuple);
    1016          24 :                 continue;
    1017             :             }
    1018          12 :             relname = get_rel_name(relid);
    1019          12 :             if (relname == NULL)    /* table concurrently dropped */
    1020             :             {
    1021           0 :                 ReleaseSysCache(rftuple);
    1022           0 :                 continue;
    1023             :             }
    1024             : 
    1025          12 :             if (has_rowfilter)
    1026           6 :                 ereport(ERROR,
    1027             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1028             :                          errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
    1029             :                                 "publish_via_partition_root",
    1030             :                                 stmt->pubname),
    1031             :                          errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
    1032             :                                    relname, "publish_via_partition_root")));
    1033             :             Assert(has_collist);
    1034           6 :             ereport(ERROR,
    1035             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1036             :                      errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
    1037             :                             "publish_via_partition_root",
    1038             :                             stmt->pubname),
    1039             :                      errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
    1040             :                                relname, "publish_via_partition_root")));
    1041             :         }
    1042             :     }
    1043             : 
    1044             :     /* Everything ok, form a new tuple. */
    1045         104 :     memset(values, 0, sizeof(values));
    1046         104 :     memset(nulls, false, sizeof(nulls));
    1047         104 :     memset(replaces, false, sizeof(replaces));
    1048             : 
    1049         104 :     if (publish_given)
    1050             :     {
    1051          28 :         values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
    1052          28 :         replaces[Anum_pg_publication_pubinsert - 1] = true;
    1053             : 
    1054          28 :         values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
    1055          28 :         replaces[Anum_pg_publication_pubupdate - 1] = true;
    1056             : 
    1057          28 :         values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
    1058          28 :         replaces[Anum_pg_publication_pubdelete - 1] = true;
    1059             : 
    1060          28 :         values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
    1061          28 :         replaces[Anum_pg_publication_pubtruncate - 1] = true;
    1062             :     }
    1063             : 
    1064         104 :     if (publish_via_partition_root_given)
    1065             :     {
    1066          70 :         values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
    1067          70 :         replaces[Anum_pg_publication_pubviaroot - 1] = true;
    1068             :     }
    1069             : 
    1070         104 :     if (publish_generated_columns_given)
    1071             :     {
    1072           6 :         values[Anum_pg_publication_pubgencols - 1] = CharGetDatum(publish_generated_columns);
    1073           6 :         replaces[Anum_pg_publication_pubgencols - 1] = true;
    1074             :     }
    1075             : 
    1076         104 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
    1077             :                             replaces);
    1078             : 
    1079             :     /* Update the catalog. */
    1080         104 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    1081             : 
    1082         104 :     CommandCounterIncrement();
    1083             : 
    1084         104 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1085             : 
    1086             :     /* Invalidate the relcache. */
    1087         104 :     if (pubform->puballtables)
    1088             :     {
    1089           8 :         CacheInvalidateRelcacheAll();
    1090             :     }
    1091             :     else
    1092             :     {
    1093          96 :         List       *relids = NIL;
    1094          96 :         List       *schemarelids = NIL;
    1095             : 
    1096             :         /*
    1097             :          * For any partitioned tables contained in the publication, we must
    1098             :          * invalidate all partitions contained in the respective partition
    1099             :          * trees, not just those explicitly mentioned in the publication.
    1100             :          */
    1101          96 :         if (root_relids == NIL)
    1102          60 :             relids = GetPublicationRelations(pubform->oid,
    1103             :                                              PUBLICATION_PART_ALL);
    1104             :         else
    1105             :         {
    1106             :             /*
    1107             :              * We already got tables explicitly mentioned in the publication.
    1108             :              * Now get all partitions for the partitioned table in the list.
    1109             :              */
    1110          72 :             foreach(lc, root_relids)
    1111          36 :                 relids = GetPubPartitionOptionRelations(relids,
    1112             :                                                         PUBLICATION_PART_ALL,
    1113             :                                                         lfirst_oid(lc));
    1114             :         }
    1115             : 
    1116          96 :         schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
    1117             :                                                         PUBLICATION_PART_ALL);
    1118          96 :         relids = list_concat_unique_oid(relids, schemarelids);
    1119             : 
    1120          96 :         InvalidatePublicationRels(relids);
    1121             :     }
    1122             : 
    1123         104 :     ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
    1124         104 :     EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
    1125             :                                      (Node *) stmt);
    1126             : 
    1127         104 :     InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
    1128         104 : }
    1129             : 
    1130             : /*
    1131             :  * Invalidate the relations.
    1132             :  */
    1133             : void
    1134        2336 : InvalidatePublicationRels(List *relids)
    1135             : {
    1136             :     /*
    1137             :      * We don't want to send too many individual messages, at some point it's
    1138             :      * cheaper to just reset whole relcache.
    1139             :      */
    1140        2336 :     if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
    1141             :     {
    1142             :         ListCell   *lc;
    1143             : 
    1144       19572 :         foreach(lc, relids)
    1145       17236 :             CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
    1146             :     }
    1147             :     else
    1148           0 :         CacheInvalidateRelcacheAll();
    1149        2336 : }
    1150             : 
    1151             : /*
    1152             :  * Add or remove table to/from publication.
    1153             :  */
    1154             : static void
    1155         890 : AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
    1156             :                        List *tables, const char *queryString,
    1157             :                        bool publish_schema)
    1158             : {
    1159         890 :     List       *rels = NIL;
    1160         890 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
    1161         890 :     Oid         pubid = pubform->oid;
    1162             : 
    1163             :     /*
    1164             :      * Nothing to do if no objects, except in SET: for that it is quite
    1165             :      * possible that user has not specified any tables in which case we need
    1166             :      * to remove all the existing tables.
    1167             :      */
    1168         890 :     if (!tables && stmt->action != AP_SetObjects)
    1169          66 :         return;
    1170             : 
    1171         824 :     rels = OpenTableList(tables);
    1172             : 
    1173         824 :     if (stmt->action == AP_AddObjects)
    1174             :     {
    1175         280 :         TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
    1176             : 
    1177         262 :         publish_schema |= is_schema_publication(pubid);
    1178             : 
    1179         262 :         CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
    1180         262 :                                    pubform->pubviaroot);
    1181             : 
    1182         250 :         PublicationAddTables(pubid, rels, false, stmt);
    1183             :     }
    1184         544 :     else if (stmt->action == AP_DropObjects)
    1185          98 :         PublicationDropTables(pubid, rels, false);
    1186             :     else                        /* AP_SetObjects */
    1187             :     {
    1188         446 :         List       *oldrelids = GetPublicationRelations(pubid,
    1189             :                                                         PUBLICATION_PART_ROOT);
    1190         446 :         List       *delrels = NIL;
    1191             :         ListCell   *oldlc;
    1192             : 
    1193         446 :         TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
    1194             : 
    1195         428 :         CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
    1196         428 :                                    pubform->pubviaroot);
    1197             : 
    1198             :         /*
    1199             :          * To recreate the relation list for the publication, look for
    1200             :          * existing relations that do not need to be dropped.
    1201             :          */
    1202         806 :         foreach(oldlc, oldrelids)
    1203             :         {
    1204         402 :             Oid         oldrelid = lfirst_oid(oldlc);
    1205             :             ListCell   *newlc;
    1206             :             PublicationRelInfo *oldrel;
    1207         402 :             bool        found = false;
    1208             :             HeapTuple   rftuple;
    1209         402 :             Node       *oldrelwhereclause = NULL;
    1210         402 :             Bitmapset  *oldcolumns = NULL;
    1211             : 
    1212             :             /* look up the cache for the old relmap */
    1213         402 :             rftuple = SearchSysCache2(PUBLICATIONRELMAP,
    1214             :                                       ObjectIdGetDatum(oldrelid),
    1215             :                                       ObjectIdGetDatum(pubid));
    1216             : 
    1217             :             /*
    1218             :              * See if the existing relation currently has a WHERE clause or a
    1219             :              * column list. We need to compare those too.
    1220             :              */
    1221         402 :             if (HeapTupleIsValid(rftuple))
    1222             :             {
    1223         402 :                 bool        isnull = true;
    1224             :                 Datum       whereClauseDatum;
    1225             :                 Datum       columnListDatum;
    1226             : 
    1227             :                 /* Load the WHERE clause for this table. */
    1228         402 :                 whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
    1229             :                                                    Anum_pg_publication_rel_prqual,
    1230             :                                                    &isnull);
    1231         402 :                 if (!isnull)
    1232         210 :                     oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
    1233             : 
    1234             :                 /* Transform the int2vector column list to a bitmap. */
    1235         402 :                 columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
    1236             :                                                   Anum_pg_publication_rel_prattrs,
    1237             :                                                   &isnull);
    1238             : 
    1239         402 :                 if (!isnull)
    1240         136 :                     oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
    1241             : 
    1242         402 :                 ReleaseSysCache(rftuple);
    1243             :             }
    1244             : 
    1245         778 :             foreach(newlc, rels)
    1246             :             {
    1247             :                 PublicationRelInfo *newpubrel;
    1248             :                 Oid         newrelid;
    1249         404 :                 Bitmapset  *newcolumns = NULL;
    1250             : 
    1251         404 :                 newpubrel = (PublicationRelInfo *) lfirst(newlc);
    1252         404 :                 newrelid = RelationGetRelid(newpubrel->relation);
    1253             : 
    1254             :                 /*
    1255             :                  * Validate the column list.  If the column list or WHERE
    1256             :                  * clause changes, then the validation done here will be
    1257             :                  * duplicated inside PublicationAddTables().  The validation
    1258             :                  * is cheap enough that that seems harmless.
    1259             :                  */
    1260         404 :                 newcolumns = pub_collist_validate(newpubrel->relation,
    1261             :                                                   newpubrel->columns);
    1262             : 
    1263             :                 /*
    1264             :                  * Check if any of the new set of relations matches with the
    1265             :                  * existing relations in the publication. Additionally, if the
    1266             :                  * relation has an associated WHERE clause, check the WHERE
    1267             :                  * expressions also match. Same for the column list. Drop the
    1268             :                  * rest.
    1269             :                  */
    1270         392 :                 if (newrelid == oldrelid)
    1271             :                 {
    1272         284 :                     if (equal(oldrelwhereclause, newpubrel->whereClause) &&
    1273          80 :                         bms_equal(oldcolumns, newcolumns))
    1274             :                     {
    1275          16 :                         found = true;
    1276          16 :                         break;
    1277             :                     }
    1278             :                 }
    1279             :             }
    1280             : 
    1281             :             /*
    1282             :              * Add the non-matched relations to a list so that they can be
    1283             :              * dropped.
    1284             :              */
    1285         390 :             if (!found)
    1286             :             {
    1287         374 :                 oldrel = palloc(sizeof(PublicationRelInfo));
    1288         374 :                 oldrel->whereClause = NULL;
    1289         374 :                 oldrel->columns = NIL;
    1290         374 :                 oldrel->relation = table_open(oldrelid,
    1291             :                                               ShareUpdateExclusiveLock);
    1292         374 :                 delrels = lappend(delrels, oldrel);
    1293             :             }
    1294             :         }
    1295             : 
    1296             :         /* And drop them. */
    1297         404 :         PublicationDropTables(pubid, delrels, true);
    1298             : 
    1299             :         /*
    1300             :          * Don't bother calculating the difference for adding, we'll catch and
    1301             :          * skip existing ones when doing catalog update.
    1302             :          */
    1303         404 :         PublicationAddTables(pubid, rels, true, stmt);
    1304             : 
    1305         404 :         CloseTableList(delrels);
    1306             :     }
    1307             : 
    1308         692 :     CloseTableList(rels);
    1309             : }
    1310             : 
    1311             : /*
    1312             :  * Alter the publication schemas.
    1313             :  *
    1314             :  * Add or remove schemas to/from publication.
    1315             :  */
    1316             : static void
    1317         758 : AlterPublicationSchemas(AlterPublicationStmt *stmt,
    1318             :                         HeapTuple tup, List *schemaidlist)
    1319             : {
    1320         758 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
    1321             : 
    1322             :     /*
    1323             :      * Nothing to do if no objects, except in SET: for that it is quite
    1324             :      * possible that user has not specified any schemas in which case we need
    1325             :      * to remove all the existing schemas.
    1326             :      */
    1327         758 :     if (!schemaidlist && stmt->action != AP_SetObjects)
    1328         288 :         return;
    1329             : 
    1330             :     /*
    1331             :      * Schema lock is held until the publication is altered to prevent
    1332             :      * concurrent schema deletion.
    1333             :      */
    1334         470 :     LockSchemaList(schemaidlist);
    1335         470 :     if (stmt->action == AP_AddObjects)
    1336             :     {
    1337             :         ListCell   *lc;
    1338             :         List       *reloids;
    1339             : 
    1340          28 :         reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
    1341             : 
    1342          36 :         foreach(lc, reloids)
    1343             :         {
    1344             :             HeapTuple   coltuple;
    1345             : 
    1346          14 :             coltuple = SearchSysCache2(PUBLICATIONRELMAP,
    1347             :                                        ObjectIdGetDatum(lfirst_oid(lc)),
    1348             :                                        ObjectIdGetDatum(pubform->oid));
    1349             : 
    1350          14 :             if (!HeapTupleIsValid(coltuple))
    1351           0 :                 continue;
    1352             : 
    1353             :             /*
    1354             :              * Disallow adding schema if column list is already part of the
    1355             :              * publication. See CheckPubRelationColumnList.
    1356             :              */
    1357          14 :             if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
    1358           6 :                 ereport(ERROR,
    1359             :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1360             :                         errmsg("cannot add schema to publication \"%s\"",
    1361             :                                stmt->pubname),
    1362             :                         errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
    1363             : 
    1364           8 :             ReleaseSysCache(coltuple);
    1365             :         }
    1366             : 
    1367          22 :         PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
    1368             :     }
    1369         442 :     else if (stmt->action == AP_DropObjects)
    1370          38 :         PublicationDropSchemas(pubform->oid, schemaidlist, false);
    1371             :     else                        /* AP_SetObjects */
    1372             :     {
    1373         404 :         List       *oldschemaids = GetPublicationSchemas(pubform->oid);
    1374         404 :         List       *delschemas = NIL;
    1375             : 
    1376             :         /* Identify which schemas should be dropped */
    1377         404 :         delschemas = list_difference_oid(oldschemaids, schemaidlist);
    1378             : 
    1379             :         /*
    1380             :          * Schema lock is held until the publication is altered to prevent
    1381             :          * concurrent schema deletion.
    1382             :          */
    1383         404 :         LockSchemaList(delschemas);
    1384             : 
    1385             :         /* And drop them */
    1386         404 :         PublicationDropSchemas(pubform->oid, delschemas, true);
    1387             : 
    1388             :         /*
    1389             :          * Don't bother calculating the difference for adding, we'll catch and
    1390             :          * skip existing ones when doing catalog update.
    1391             :          */
    1392         404 :         PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
    1393             :     }
    1394             : }
    1395             : 
    1396             : /*
    1397             :  * Check if relations and schemas can be in a given publication and throw
    1398             :  * appropriate error if not.
    1399             :  */
    1400             : static void
    1401         932 : CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
    1402             :                       List *tables, List *schemaidlist)
    1403             : {
    1404         932 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
    1405             : 
    1406         932 :     if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
    1407          94 :         schemaidlist && !superuser())
    1408           6 :         ereport(ERROR,
    1409             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1410             :                  errmsg("must be superuser to add or set schemas")));
    1411             : 
    1412             :     /*
    1413             :      * Check that user is allowed to manipulate the publication tables in
    1414             :      * schema
    1415             :      */
    1416         926 :     if (schemaidlist && pubform->puballtables)
    1417          18 :         ereport(ERROR,
    1418             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1419             :                  errmsg("publication \"%s\" is defined as FOR ALL TABLES",
    1420             :                         NameStr(pubform->pubname)),
    1421             :                  errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications.")));
    1422             : 
    1423             :     /* Check that user is allowed to manipulate the publication tables. */
    1424         908 :     if (tables && pubform->puballtables)
    1425          18 :         ereport(ERROR,
    1426             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1427             :                  errmsg("publication \"%s\" is defined as FOR ALL TABLES",
    1428             :                         NameStr(pubform->pubname)),
    1429             :                  errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
    1430         890 : }
    1431             : 
    1432             : /*
    1433             :  * Alter the existing publication.
    1434             :  *
    1435             :  * This is dispatcher function for AlterPublicationOptions,
    1436             :  * AlterPublicationSchemas and AlterPublicationTables.
    1437             :  */
    1438             : void
    1439        1066 : AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
    1440             : {
    1441             :     Relation    rel;
    1442             :     HeapTuple   tup;
    1443             :     Form_pg_publication pubform;
    1444             : 
    1445        1066 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    1446             : 
    1447        1066 :     tup = SearchSysCacheCopy1(PUBLICATIONNAME,
    1448             :                               CStringGetDatum(stmt->pubname));
    1449             : 
    1450        1066 :     if (!HeapTupleIsValid(tup))
    1451           0 :         ereport(ERROR,
    1452             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    1453             :                  errmsg("publication \"%s\" does not exist",
    1454             :                         stmt->pubname)));
    1455             : 
    1456        1066 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1457             : 
    1458             :     /* must be owner */
    1459        1066 :     if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
    1460           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
    1461           0 :                        stmt->pubname);
    1462             : 
    1463        1066 :     if (stmt->options)
    1464         116 :         AlterPublicationOptions(pstate, stmt, rel, tup);
    1465             :     else
    1466             :     {
    1467         950 :         List       *relations = NIL;
    1468         950 :         List       *schemaidlist = NIL;
    1469         950 :         Oid         pubid = pubform->oid;
    1470             : 
    1471         950 :         ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
    1472             :                                    &schemaidlist);
    1473             : 
    1474         932 :         CheckAlterPublication(stmt, tup, relations, schemaidlist);
    1475             : 
    1476         890 :         heap_freetuple(tup);
    1477             : 
    1478             :         /* Lock the publication so nobody else can do anything with it. */
    1479         890 :         LockDatabaseObject(PublicationRelationId, pubid, 0,
    1480             :                            AccessExclusiveLock);
    1481             : 
    1482             :         /*
    1483             :          * It is possible that by the time we acquire the lock on publication,
    1484             :          * concurrent DDL has removed it. We can test this by checking the
    1485             :          * existence of publication. We get the tuple again to avoid the risk
    1486             :          * of any publication option getting changed.
    1487             :          */
    1488         890 :         tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    1489         890 :         if (!HeapTupleIsValid(tup))
    1490           0 :             ereport(ERROR,
    1491             :                     errcode(ERRCODE_UNDEFINED_OBJECT),
    1492             :                     errmsg("publication \"%s\" does not exist",
    1493             :                            stmt->pubname));
    1494             : 
    1495         890 :         AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
    1496             :                                schemaidlist != NIL);
    1497         758 :         AlterPublicationSchemas(stmt, tup, schemaidlist);
    1498             :     }
    1499             : 
    1500             :     /* Cleanup. */
    1501         844 :     heap_freetuple(tup);
    1502         844 :     table_close(rel, RowExclusiveLock);
    1503         844 : }
    1504             : 
    1505             : /*
    1506             :  * Remove relation from publication by mapping OID.
    1507             :  */
    1508             : void
    1509         838 : RemovePublicationRelById(Oid proid)
    1510             : {
    1511             :     Relation    rel;
    1512             :     HeapTuple   tup;
    1513             :     Form_pg_publication_rel pubrel;
    1514         838 :     List       *relids = NIL;
    1515             : 
    1516         838 :     rel = table_open(PublicationRelRelationId, RowExclusiveLock);
    1517             : 
    1518         838 :     tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
    1519             : 
    1520         838 :     if (!HeapTupleIsValid(tup))
    1521           0 :         elog(ERROR, "cache lookup failed for publication table %u",
    1522             :              proid);
    1523             : 
    1524         838 :     pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
    1525             : 
    1526             :     /*
    1527             :      * Invalidate relcache so that publication info is rebuilt.
    1528             :      *
    1529             :      * For the partitioned tables, we must invalidate all partitions contained
    1530             :      * in the respective partition hierarchies, not just the one explicitly
    1531             :      * mentioned in the publication. This is required because we implicitly
    1532             :      * publish the child tables when the parent table is published.
    1533             :      */
    1534         838 :     relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
    1535             :                                             pubrel->prrelid);
    1536             : 
    1537         838 :     InvalidatePublicationRels(relids);
    1538             : 
    1539         838 :     CatalogTupleDelete(rel, &tup->t_self);
    1540             : 
    1541         838 :     ReleaseSysCache(tup);
    1542             : 
    1543         838 :     table_close(rel, RowExclusiveLock);
    1544         838 : }
    1545             : 
    1546             : /*
    1547             :  * Remove the publication by mapping OID.
    1548             :  */
    1549             : void
    1550         458 : RemovePublicationById(Oid pubid)
    1551             : {
    1552             :     Relation    rel;
    1553             :     HeapTuple   tup;
    1554             :     Form_pg_publication pubform;
    1555             : 
    1556         458 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    1557             : 
    1558         458 :     tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    1559         458 :     if (!HeapTupleIsValid(tup))
    1560           0 :         elog(ERROR, "cache lookup failed for publication %u", pubid);
    1561             : 
    1562         458 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1563             : 
    1564             :     /* Invalidate relcache so that publication info is rebuilt. */
    1565         458 :     if (pubform->puballtables)
    1566          52 :         CacheInvalidateRelcacheAll();
    1567             : 
    1568         458 :     CatalogTupleDelete(rel, &tup->t_self);
    1569             : 
    1570         458 :     ReleaseSysCache(tup);
    1571             : 
    1572         458 :     table_close(rel, RowExclusiveLock);
    1573         458 : }
    1574             : 
    1575             : /*
    1576             :  * Remove schema from publication by mapping OID.
    1577             :  */
    1578             : void
    1579         192 : RemovePublicationSchemaById(Oid psoid)
    1580             : {
    1581             :     Relation    rel;
    1582             :     HeapTuple   tup;
    1583         192 :     List       *schemaRels = NIL;
    1584             :     Form_pg_publication_namespace pubsch;
    1585             : 
    1586         192 :     rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
    1587             : 
    1588         192 :     tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
    1589             : 
    1590         192 :     if (!HeapTupleIsValid(tup))
    1591           0 :         elog(ERROR, "cache lookup failed for publication schema %u", psoid);
    1592             : 
    1593         192 :     pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
    1594             : 
    1595             :     /*
    1596             :      * Invalidate relcache so that publication info is rebuilt. See
    1597             :      * RemovePublicationRelById for why we need to consider all the
    1598             :      * partitions.
    1599             :      */
    1600         192 :     schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
    1601             :                                                PUBLICATION_PART_ALL);
    1602         192 :     InvalidatePublicationRels(schemaRels);
    1603             : 
    1604         192 :     CatalogTupleDelete(rel, &tup->t_self);
    1605             : 
    1606         192 :     ReleaseSysCache(tup);
    1607             : 
    1608         192 :     table_close(rel, RowExclusiveLock);
    1609         192 : }
    1610             : 
    1611             : /*
    1612             :  * Open relations specified by a PublicationTable list.
    1613             :  * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
    1614             :  * add them to a publication.
    1615             :  */
    1616             : static List *
    1617        1266 : OpenTableList(List *tables)
    1618             : {
    1619        1266 :     List       *relids = NIL;
    1620        1266 :     List       *rels = NIL;
    1621             :     ListCell   *lc;
    1622        1266 :     List       *relids_with_rf = NIL;
    1623        1266 :     List       *relids_with_collist = NIL;
    1624             : 
    1625             :     /*
    1626             :      * Open, share-lock, and check all the explicitly-specified relations
    1627             :      */
    1628        2590 :     foreach(lc, tables)
    1629             :     {
    1630        1348 :         PublicationTable *t = lfirst_node(PublicationTable, lc);
    1631        1348 :         bool        recurse = t->relation->inh;
    1632             :         Relation    rel;
    1633             :         Oid         myrelid;
    1634             :         PublicationRelInfo *pub_rel;
    1635             : 
    1636             :         /* Allow query cancel in case this takes a long time */
    1637        1348 :         CHECK_FOR_INTERRUPTS();
    1638             : 
    1639        1348 :         rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
    1640        1348 :         myrelid = RelationGetRelid(rel);
    1641             : 
    1642             :         /*
    1643             :          * Filter out duplicates if user specifies "foo, foo".
    1644             :          *
    1645             :          * Note that this algorithm is known to not be very efficient (O(N^2))
    1646             :          * but given that it only works on list of tables given to us by user
    1647             :          * it's deemed acceptable.
    1648             :          */
    1649        1348 :         if (list_member_oid(relids, myrelid))
    1650             :         {
    1651             :             /* Disallow duplicate tables if there are any with row filters. */
    1652          24 :             if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
    1653          12 :                 ereport(ERROR,
    1654             :                         (errcode(ERRCODE_DUPLICATE_OBJECT),
    1655             :                          errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
    1656             :                                 RelationGetRelationName(rel))));
    1657             : 
    1658             :             /* Disallow duplicate tables if there are any with column lists. */
    1659          12 :             if (t->columns || list_member_oid(relids_with_collist, myrelid))
    1660          12 :                 ereport(ERROR,
    1661             :                         (errcode(ERRCODE_DUPLICATE_OBJECT),
    1662             :                          errmsg("conflicting or redundant column lists for table \"%s\"",
    1663             :                                 RelationGetRelationName(rel))));
    1664             : 
    1665           0 :             table_close(rel, ShareUpdateExclusiveLock);
    1666           0 :             continue;
    1667             :         }
    1668             : 
    1669        1324 :         pub_rel = palloc(sizeof(PublicationRelInfo));
    1670        1324 :         pub_rel->relation = rel;
    1671        1324 :         pub_rel->whereClause = t->whereClause;
    1672        1324 :         pub_rel->columns = t->columns;
    1673        1324 :         rels = lappend(rels, pub_rel);
    1674        1324 :         relids = lappend_oid(relids, myrelid);
    1675             : 
    1676        1324 :         if (t->whereClause)
    1677         386 :             relids_with_rf = lappend_oid(relids_with_rf, myrelid);
    1678             : 
    1679        1324 :         if (t->columns)
    1680         386 :             relids_with_collist = lappend_oid(relids_with_collist, myrelid);
    1681             : 
    1682             :         /*
    1683             :          * Add children of this rel, if requested, so that they too are added
    1684             :          * to the publication.  A partitioned table can't have any inheritance
    1685             :          * children other than its partitions, which need not be explicitly
    1686             :          * added to the publication.
    1687             :          */
    1688        1324 :         if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
    1689             :         {
    1690             :             List       *children;
    1691             :             ListCell   *child;
    1692             : 
    1693        1140 :             children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
    1694             :                                            NULL);
    1695             : 
    1696        2288 :             foreach(child, children)
    1697             :             {
    1698        1148 :                 Oid         childrelid = lfirst_oid(child);
    1699             : 
    1700             :                 /* Allow query cancel in case this takes a long time */
    1701        1148 :                 CHECK_FOR_INTERRUPTS();
    1702             : 
    1703             :                 /*
    1704             :                  * Skip duplicates if user specified both parent and child
    1705             :                  * tables.
    1706             :                  */
    1707        1148 :                 if (list_member_oid(relids, childrelid))
    1708             :                 {
    1709             :                     /*
    1710             :                      * We don't allow to specify row filter for both parent
    1711             :                      * and child table at the same time as it is not very
    1712             :                      * clear which one should be given preference.
    1713             :                      */
    1714        1140 :                     if (childrelid != myrelid &&
    1715           0 :                         (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
    1716           0 :                         ereport(ERROR,
    1717             :                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    1718             :                                  errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
    1719             :                                         RelationGetRelationName(rel))));
    1720             : 
    1721             :                     /*
    1722             :                      * We don't allow to specify column list for both parent
    1723             :                      * and child table at the same time as it is not very
    1724             :                      * clear which one should be given preference.
    1725             :                      */
    1726        1140 :                     if (childrelid != myrelid &&
    1727           0 :                         (t->columns || list_member_oid(relids_with_collist, childrelid)))
    1728           0 :                         ereport(ERROR,
    1729             :                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    1730             :                                  errmsg("conflicting or redundant column lists for table \"%s\"",
    1731             :                                         RelationGetRelationName(rel))));
    1732             : 
    1733        1140 :                     continue;
    1734             :                 }
    1735             : 
    1736             :                 /* find_all_inheritors already got lock */
    1737           8 :                 rel = table_open(childrelid, NoLock);
    1738           8 :                 pub_rel = palloc(sizeof(PublicationRelInfo));
    1739           8 :                 pub_rel->relation = rel;
    1740             :                 /* child inherits WHERE clause from parent */
    1741           8 :                 pub_rel->whereClause = t->whereClause;
    1742             : 
    1743             :                 /* child inherits column list from parent */
    1744           8 :                 pub_rel->columns = t->columns;
    1745           8 :                 rels = lappend(rels, pub_rel);
    1746           8 :                 relids = lappend_oid(relids, childrelid);
    1747             : 
    1748           8 :                 if (t->whereClause)
    1749           2 :                     relids_with_rf = lappend_oid(relids_with_rf, childrelid);
    1750             : 
    1751           8 :                 if (t->columns)
    1752           0 :                     relids_with_collist = lappend_oid(relids_with_collist, childrelid);
    1753             :             }
    1754             :         }
    1755             :     }
    1756             : 
    1757        1242 :     list_free(relids);
    1758        1242 :     list_free(relids_with_rf);
    1759             : 
    1760        1242 :     return rels;
    1761             : }
    1762             : 
    1763             : /*
    1764             :  * Close all relations in the list.
    1765             :  */
    1766             : static void
    1767        1452 : CloseTableList(List *rels)
    1768             : {
    1769             :     ListCell   *lc;
    1770             : 
    1771        2940 :     foreach(lc, rels)
    1772             :     {
    1773             :         PublicationRelInfo *pub_rel;
    1774             : 
    1775        1488 :         pub_rel = (PublicationRelInfo *) lfirst(lc);
    1776        1488 :         table_close(pub_rel->relation, NoLock);
    1777             :     }
    1778             : 
    1779        1452 :     list_free_deep(rels);
    1780        1452 : }
    1781             : 
    1782             : /*
    1783             :  * Lock the schemas specified in the schema list in AccessShareLock mode in
    1784             :  * order to prevent concurrent schema deletion.
    1785             :  */
    1786             : static void
    1787        1008 : LockSchemaList(List *schemalist)
    1788             : {
    1789             :     ListCell   *lc;
    1790             : 
    1791        1292 :     foreach(lc, schemalist)
    1792             :     {
    1793         284 :         Oid         schemaid = lfirst_oid(lc);
    1794             : 
    1795             :         /* Allow query cancel in case this takes a long time */
    1796         284 :         CHECK_FOR_INTERRUPTS();
    1797         284 :         LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
    1798             : 
    1799             :         /*
    1800             :          * It is possible that by the time we acquire the lock on schema,
    1801             :          * concurrent DDL has removed it. We can test this by checking the
    1802             :          * existence of schema.
    1803             :          */
    1804         284 :         if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
    1805           0 :             ereport(ERROR,
    1806             :                     errcode(ERRCODE_UNDEFINED_SCHEMA),
    1807             :                     errmsg("schema with OID %u does not exist", schemaid));
    1808             :     }
    1809        1008 : }
    1810             : 
    1811             : /*
    1812             :  * Add listed tables to the publication.
    1813             :  */
    1814             : static void
    1815        1036 : PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
    1816             :                      AlterPublicationStmt *stmt)
    1817             : {
    1818             :     ListCell   *lc;
    1819             : 
    1820             :     Assert(!stmt || !stmt->for_all_tables);
    1821             : 
    1822        2064 :     foreach(lc, rels)
    1823             :     {
    1824        1096 :         PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
    1825        1096 :         Relation    rel = pub_rel->relation;
    1826             :         ObjectAddress obj;
    1827             : 
    1828             :         /* Must be owner of the table or superuser. */
    1829        1096 :         if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
    1830           6 :             aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
    1831           6 :                            RelationGetRelationName(rel));
    1832             : 
    1833        1090 :         obj = publication_add_relation(pubid, pub_rel, if_not_exists);
    1834        1028 :         if (stmt)
    1835             :         {
    1836         612 :             EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
    1837             :                                              (Node *) stmt);
    1838             : 
    1839         612 :             InvokeObjectPostCreateHook(PublicationRelRelationId,
    1840             :                                        obj.objectId, 0);
    1841             :         }
    1842             :     }
    1843         968 : }
    1844             : 
    1845             : /*
    1846             :  * Remove listed tables from the publication.
    1847             :  */
    1848             : static void
    1849         502 : PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
    1850             : {
    1851             :     ObjectAddress obj;
    1852             :     ListCell   *lc;
    1853             :     Oid         prid;
    1854             : 
    1855         962 :     foreach(lc, rels)
    1856             :     {
    1857         478 :         PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
    1858         478 :         Relation    rel = pubrel->relation;
    1859         478 :         Oid         relid = RelationGetRelid(rel);
    1860             : 
    1861         478 :         if (pubrel->columns)
    1862           0 :             ereport(ERROR,
    1863             :                     errcode(ERRCODE_SYNTAX_ERROR),
    1864             :                     errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
    1865             : 
    1866         478 :         prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
    1867             :                                ObjectIdGetDatum(relid),
    1868             :                                ObjectIdGetDatum(pubid));
    1869         478 :         if (!OidIsValid(prid))
    1870             :         {
    1871          12 :             if (missing_ok)
    1872           0 :                 continue;
    1873             : 
    1874          12 :             ereport(ERROR,
    1875             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    1876             :                      errmsg("relation \"%s\" is not part of the publication",
    1877             :                             RelationGetRelationName(rel))));
    1878             :         }
    1879             : 
    1880         466 :         if (pubrel->whereClause)
    1881           6 :             ereport(ERROR,
    1882             :                     (errcode(ERRCODE_SYNTAX_ERROR),
    1883             :                      errmsg("cannot use a WHERE clause when removing a table from a publication")));
    1884             : 
    1885         460 :         ObjectAddressSet(obj, PublicationRelRelationId, prid);
    1886         460 :         performDeletion(&obj, DROP_CASCADE, 0);
    1887             :     }
    1888         484 : }
    1889             : 
    1890             : /*
    1891             :  * Add listed schemas to the publication.
    1892             :  */
    1893             : static void
    1894         560 : PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
    1895             :                       AlterPublicationStmt *stmt)
    1896             : {
    1897             :     ListCell   *lc;
    1898             : 
    1899             :     Assert(!stmt || !stmt->for_all_tables);
    1900             : 
    1901         770 :     foreach(lc, schemas)
    1902             :     {
    1903         222 :         Oid         schemaid = lfirst_oid(lc);
    1904             :         ObjectAddress obj;
    1905             : 
    1906         222 :         obj = publication_add_schema(pubid, schemaid, if_not_exists);
    1907         210 :         if (stmt)
    1908             :         {
    1909          58 :             EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
    1910             :                                              (Node *) stmt);
    1911             : 
    1912          58 :             InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
    1913             :                                        obj.objectId, 0);
    1914             :         }
    1915             :     }
    1916         548 : }
    1917             : 
    1918             : /*
    1919             :  * Remove listed schemas from the publication.
    1920             :  */
    1921             : static void
    1922         442 : PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
    1923             : {
    1924             :     ObjectAddress obj;
    1925             :     ListCell   *lc;
    1926             :     Oid         psid;
    1927             : 
    1928         492 :     foreach(lc, schemas)
    1929             :     {
    1930          56 :         Oid         schemaid = lfirst_oid(lc);
    1931             : 
    1932          56 :         psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
    1933             :                                Anum_pg_publication_namespace_oid,
    1934             :                                ObjectIdGetDatum(schemaid),
    1935             :                                ObjectIdGetDatum(pubid));
    1936          56 :         if (!OidIsValid(psid))
    1937             :         {
    1938           6 :             if (missing_ok)
    1939           0 :                 continue;
    1940             : 
    1941           6 :             ereport(ERROR,
    1942             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    1943             :                      errmsg("tables from schema \"%s\" are not part of the publication",
    1944             :                             get_namespace_name(schemaid))));
    1945             :         }
    1946             : 
    1947          50 :         ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
    1948          50 :         performDeletion(&obj, DROP_CASCADE, 0);
    1949             :     }
    1950         436 : }
    1951             : 
    1952             : /*
    1953             :  * Internal workhorse for changing a publication owner
    1954             :  */
    1955             : static void
    1956          26 : AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
    1957             : {
    1958             :     Form_pg_publication form;
    1959             : 
    1960          26 :     form = (Form_pg_publication) GETSTRUCT(tup);
    1961             : 
    1962          26 :     if (form->pubowner == newOwnerId)
    1963           2 :         return;
    1964             : 
    1965          24 :     if (!superuser())
    1966             :     {
    1967             :         AclResult   aclresult;
    1968             : 
    1969             :         /* Must be owner */
    1970          12 :         if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
    1971           0 :             aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
    1972           0 :                            NameStr(form->pubname));
    1973             : 
    1974             :         /* Must be able to become new owner */
    1975          12 :         check_can_set_role(GetUserId(), newOwnerId);
    1976             : 
    1977             :         /* New owner must have CREATE privilege on database */
    1978          12 :         aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
    1979          12 :         if (aclresult != ACLCHECK_OK)
    1980           0 :             aclcheck_error(aclresult, OBJECT_DATABASE,
    1981           0 :                            get_database_name(MyDatabaseId));
    1982             : 
    1983          12 :         if (form->puballtables && !superuser_arg(newOwnerId))
    1984           0 :             ereport(ERROR,
    1985             :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1986             :                      errmsg("permission denied to change owner of publication \"%s\"",
    1987             :                             NameStr(form->pubname)),
    1988             :                      errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
    1989             : 
    1990          12 :         if (!superuser_arg(newOwnerId) && is_schema_publication(form->oid))
    1991           6 :             ereport(ERROR,
    1992             :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1993             :                      errmsg("permission denied to change owner of publication \"%s\"",
    1994             :                             NameStr(form->pubname)),
    1995             :                      errhint("The owner of a FOR TABLES IN SCHEMA publication must be a superuser.")));
    1996             :     }
    1997             : 
    1998          18 :     form->pubowner = newOwnerId;
    1999          18 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    2000             : 
    2001             :     /* Update owner dependency reference */
    2002          18 :     changeDependencyOnOwner(PublicationRelationId,
    2003             :                             form->oid,
    2004             :                             newOwnerId);
    2005             : 
    2006          18 :     InvokeObjectPostAlterHook(PublicationRelationId,
    2007             :                               form->oid, 0);
    2008             : }
    2009             : 
    2010             : /*
    2011             :  * Change publication owner -- by name
    2012             :  */
    2013             : ObjectAddress
    2014          26 : AlterPublicationOwner(const char *name, Oid newOwnerId)
    2015             : {
    2016             :     Oid         subid;
    2017             :     HeapTuple   tup;
    2018             :     Relation    rel;
    2019             :     ObjectAddress address;
    2020             :     Form_pg_publication pubform;
    2021             : 
    2022          26 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    2023             : 
    2024          26 :     tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
    2025             : 
    2026          26 :     if (!HeapTupleIsValid(tup))
    2027           0 :         ereport(ERROR,
    2028             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2029             :                  errmsg("publication \"%s\" does not exist", name)));
    2030             : 
    2031          26 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    2032          26 :     subid = pubform->oid;
    2033             : 
    2034          26 :     AlterPublicationOwner_internal(rel, tup, newOwnerId);
    2035             : 
    2036          20 :     ObjectAddressSet(address, PublicationRelationId, subid);
    2037             : 
    2038          20 :     heap_freetuple(tup);
    2039             : 
    2040          20 :     table_close(rel, RowExclusiveLock);
    2041             : 
    2042          20 :     return address;
    2043             : }
    2044             : 
    2045             : /*
    2046             :  * Change publication owner -- by OID
    2047             :  */
    2048             : void
    2049           0 : AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
    2050             : {
    2051             :     HeapTuple   tup;
    2052             :     Relation    rel;
    2053             : 
    2054           0 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    2055             : 
    2056           0 :     tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(subid));
    2057             : 
    2058           0 :     if (!HeapTupleIsValid(tup))
    2059           0 :         ereport(ERROR,
    2060             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2061             :                  errmsg("publication with OID %u does not exist", subid)));
    2062             : 
    2063           0 :     AlterPublicationOwner_internal(rel, tup, newOwnerId);
    2064             : 
    2065           0 :     heap_freetuple(tup);
    2066             : 
    2067           0 :     table_close(rel, RowExclusiveLock);
    2068           0 : }
    2069             : 
    2070             : /*
    2071             :  * Extract the publish_generated_columns option value from a DefElem. "stored"
    2072             :  * and "none" values are accepted.
    2073             :  */
    2074             : static char
    2075          70 : defGetGeneratedColsOption(DefElem *def)
    2076             : {
    2077             :     char       *sval;
    2078             : 
    2079             :     /*
    2080             :      * If no parameter value given, assume "stored" is meant.
    2081             :      */
    2082          70 :     if (!def->arg)
    2083           6 :         return PUBLISH_GENCOLS_STORED;
    2084             : 
    2085          64 :     sval = defGetString(def);
    2086             : 
    2087          64 :     if (pg_strcasecmp(sval, "none") == 0)
    2088          22 :         return PUBLISH_GENCOLS_NONE;
    2089          42 :     if (pg_strcasecmp(sval, "stored") == 0)
    2090          36 :         return PUBLISH_GENCOLS_STORED;
    2091             : 
    2092           6 :     ereport(ERROR,
    2093             :             errcode(ERRCODE_SYNTAX_ERROR),
    2094             :             errmsg("%s requires a \"none\" or \"stored\" value",
    2095             :                    def->defname));
    2096             : 
    2097             :     return PUBLISH_GENCOLS_NONE;    /* keep compiler quiet */
    2098             : }

Generated by: LCOV version 1.14