LCOV - code coverage report
Current view: top level - src/backend/commands - publicationcmds.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 641 692 92.6 %
Date: 2025-09-01 00:17:36 Functions: 31 32 96.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * publicationcmds.c
       4             :  *      publication manipulation
       5             :  *
       6             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *      src/backend/commands/publicationcmds.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/htup_details.h"
      18             : #include "access/table.h"
      19             : #include "access/xact.h"
      20             : #include "catalog/catalog.h"
      21             : #include "catalog/indexing.h"
      22             : #include "catalog/namespace.h"
      23             : #include "catalog/objectaccess.h"
      24             : #include "catalog/objectaddress.h"
      25             : #include "catalog/pg_database.h"
      26             : #include "catalog/pg_inherits.h"
      27             : #include "catalog/pg_namespace.h"
      28             : #include "catalog/pg_proc.h"
      29             : #include "catalog/pg_publication.h"
      30             : #include "catalog/pg_publication_namespace.h"
      31             : #include "catalog/pg_publication_rel.h"
      32             : #include "commands/defrem.h"
      33             : #include "commands/event_trigger.h"
      34             : #include "commands/publicationcmds.h"
      35             : #include "miscadmin.h"
      36             : #include "nodes/nodeFuncs.h"
      37             : #include "parser/parse_clause.h"
      38             : #include "parser/parse_collate.h"
      39             : #include "parser/parse_relation.h"
      40             : #include "rewrite/rewriteHandler.h"
      41             : #include "storage/lmgr.h"
      42             : #include "utils/acl.h"
      43             : #include "utils/builtins.h"
      44             : #include "utils/inval.h"
      45             : #include "utils/lsyscache.h"
      46             : #include "utils/rel.h"
      47             : #include "utils/syscache.h"
      48             : #include "utils/varlena.h"
      49             : 
      50             : 
      51             : /*
      52             :  * Information used to validate the columns in the row filter expression. See
      53             :  * contain_invalid_rfcolumn_walker for details.
      54             :  */
      55             : typedef struct rf_context
      56             : {
      57             :     Bitmapset  *bms_replident;  /* bitset of replica identity columns */
      58             :     bool        pubviaroot;     /* true if we are validating the parent
      59             :                                  * relation's row filter */
      60             :     Oid         relid;          /* relid of the relation */
      61             :     Oid         parentid;       /* relid of the parent relation */
      62             : } rf_context;
      63             : 
      64             : static List *OpenTableList(List *tables);
      65             : static void CloseTableList(List *rels);
      66             : static void LockSchemaList(List *schemalist);
      67             : static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
      68             :                                  AlterPublicationStmt *stmt);
      69             : static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
      70             : static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
      71             :                                   AlterPublicationStmt *stmt);
      72             : static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
      73             : static char defGetGeneratedColsOption(DefElem *def);
      74             : 
      75             : 
      76             : static void
      77         960 : parse_publication_options(ParseState *pstate,
      78             :                           List *options,
      79             :                           bool *publish_given,
      80             :                           PublicationActions *pubactions,
      81             :                           bool *publish_via_partition_root_given,
      82             :                           bool *publish_via_partition_root,
      83             :                           bool *publish_generated_columns_given,
      84             :                           char *publish_generated_columns)
      85             : {
      86             :     ListCell   *lc;
      87             : 
      88         960 :     *publish_given = false;
      89         960 :     *publish_via_partition_root_given = false;
      90         960 :     *publish_generated_columns_given = false;
      91             : 
      92             :     /* defaults */
      93         960 :     pubactions->pubinsert = true;
      94         960 :     pubactions->pubupdate = true;
      95         960 :     pubactions->pubdelete = true;
      96         960 :     pubactions->pubtruncate = true;
      97         960 :     *publish_via_partition_root = false;
      98         960 :     *publish_generated_columns = PUBLISH_GENCOLS_NONE;
      99             : 
     100             :     /* Parse options */
     101        1310 :     foreach(lc, options)
     102             :     {
     103         386 :         DefElem    *defel = (DefElem *) lfirst(lc);
     104             : 
     105         386 :         if (strcmp(defel->defname, "publish") == 0)
     106             :         {
     107             :             char       *publish;
     108             :             List       *publish_list;
     109             :             ListCell   *lc2;
     110             : 
     111         132 :             if (*publish_given)
     112           0 :                 errorConflictingDefElem(defel, pstate);
     113             : 
     114             :             /*
     115             :              * If publish option was given only the explicitly listed actions
     116             :              * should be published.
     117             :              */
     118         132 :             pubactions->pubinsert = false;
     119         132 :             pubactions->pubupdate = false;
     120         132 :             pubactions->pubdelete = false;
     121         132 :             pubactions->pubtruncate = false;
     122             : 
     123         132 :             *publish_given = true;
     124         132 :             publish = defGetString(defel);
     125             : 
     126         132 :             if (!SplitIdentifierString(publish, ',', &publish_list))
     127           0 :                 ereport(ERROR,
     128             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     129             :                          errmsg("invalid list syntax in parameter \"%s\"",
     130             :                                 "publish")));
     131             : 
     132             :             /* Process the option list. */
     133         320 :             foreach(lc2, publish_list)
     134             :             {
     135         194 :                 char       *publish_opt = (char *) lfirst(lc2);
     136             : 
     137         194 :                 if (strcmp(publish_opt, "insert") == 0)
     138         118 :                     pubactions->pubinsert = true;
     139          76 :                 else if (strcmp(publish_opt, "update") == 0)
     140          30 :                     pubactions->pubupdate = true;
     141          46 :                 else if (strcmp(publish_opt, "delete") == 0)
     142          20 :                     pubactions->pubdelete = true;
     143          26 :                 else if (strcmp(publish_opt, "truncate") == 0)
     144          20 :                     pubactions->pubtruncate = true;
     145             :                 else
     146           6 :                     ereport(ERROR,
     147             :                             (errcode(ERRCODE_SYNTAX_ERROR),
     148             :                              errmsg("unrecognized value for publication option \"%s\": \"%s\"",
     149             :                                     "publish", publish_opt)));
     150             :             }
     151             :         }
     152         254 :         else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
     153             :         {
     154         172 :             if (*publish_via_partition_root_given)
     155           6 :                 errorConflictingDefElem(defel, pstate);
     156         166 :             *publish_via_partition_root_given = true;
     157         166 :             *publish_via_partition_root = defGetBoolean(defel);
     158             :         }
     159          82 :         else if (strcmp(defel->defname, "publish_generated_columns") == 0)
     160             :         {
     161          76 :             if (*publish_generated_columns_given)
     162           6 :                 errorConflictingDefElem(defel, pstate);
     163          70 :             *publish_generated_columns_given = true;
     164          70 :             *publish_generated_columns = defGetGeneratedColsOption(defel);
     165             :         }
     166             :         else
     167           6 :             ereport(ERROR,
     168             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     169             :                      errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
     170             :     }
     171         924 : }
     172             : 
     173             : /*
     174             :  * Convert the PublicationObjSpecType list into schema oid list and
     175             :  * PublicationTable list.
     176             :  */
     177             : static void
     178        1694 : ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
     179             :                            List **rels, List **schemas)
     180             : {
     181             :     ListCell   *cell;
     182             :     PublicationObjSpec *pubobj;
     183             : 
     184        1694 :     if (!pubobjspec_list)
     185         104 :         return;
     186             : 
     187        3374 :     foreach(cell, pubobjspec_list)
     188             :     {
     189             :         Oid         schemaid;
     190             :         List       *search_path;
     191             : 
     192        1820 :         pubobj = (PublicationObjSpec *) lfirst(cell);
     193             : 
     194        1820 :         switch (pubobj->pubobjtype)
     195             :         {
     196        1424 :             case PUBLICATIONOBJ_TABLE:
     197        1424 :                 *rels = lappend(*rels, pubobj->pubtable);
     198        1424 :                 break;
     199         372 :             case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
     200         372 :                 schemaid = get_namespace_oid(pubobj->name, false);
     201             : 
     202             :                 /* Filter out duplicates if user specifies "sch1, sch1" */
     203         342 :                 *schemas = list_append_unique_oid(*schemas, schemaid);
     204         342 :                 break;
     205          24 :             case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
     206          24 :                 search_path = fetch_search_path(false);
     207          24 :                 if (search_path == NIL) /* nothing valid in search_path? */
     208           6 :                     ereport(ERROR,
     209             :                             errcode(ERRCODE_UNDEFINED_SCHEMA),
     210             :                             errmsg("no schema has been selected for CURRENT_SCHEMA"));
     211             : 
     212          18 :                 schemaid = linitial_oid(search_path);
     213          18 :                 list_free(search_path);
     214             : 
     215             :                 /* Filter out duplicates if user specifies "sch1, sch1" */
     216          18 :                 *schemas = list_append_unique_oid(*schemas, schemaid);
     217          18 :                 break;
     218           0 :             default:
     219             :                 /* shouldn't happen */
     220           0 :                 elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
     221             :                 break;
     222             :         }
     223             :     }
     224             : }
     225             : 
     226             : /*
     227             :  * Returns true if any of the columns used in the row filter WHERE expression is
     228             :  * not part of REPLICA IDENTITY, false otherwise.
     229             :  */
     230             : static bool
     231         258 : contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
     232             : {
     233         258 :     if (node == NULL)
     234           0 :         return false;
     235             : 
     236         258 :     if (IsA(node, Var))
     237             :     {
     238         104 :         Var        *var = (Var *) node;
     239         104 :         AttrNumber  attnum = var->varattno;
     240             : 
     241             :         /*
     242             :          * If pubviaroot is true, we are validating the row filter of the
     243             :          * parent table, but the bitmap contains the replica identity
     244             :          * information of the child table. So, get the column number of the
     245             :          * child table as parent and child column order could be different.
     246             :          */
     247         104 :         if (context->pubviaroot)
     248             :         {
     249          16 :             char       *colname = get_attname(context->parentid, attnum, false);
     250             : 
     251          16 :             attnum = get_attnum(context->relid, colname);
     252             :         }
     253             : 
     254         104 :         if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
     255         104 :                            context->bms_replident))
     256          60 :             return true;
     257             :     }
     258             : 
     259         198 :     return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
     260             :                                   context);
     261             : }
     262             : 
     263             : /*
     264             :  * Check if all columns referenced in the filter expression are part of the
     265             :  * REPLICA IDENTITY index or not.
     266             :  *
     267             :  * Returns true if any invalid column is found.
     268             :  */
     269             : bool
     270         724 : pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
     271             :                                bool pubviaroot)
     272             : {
     273             :     HeapTuple   rftuple;
     274         724 :     Oid         relid = RelationGetRelid(relation);
     275         724 :     Oid         publish_as_relid = RelationGetRelid(relation);
     276         724 :     bool        result = false;
     277             :     Datum       rfdatum;
     278             :     bool        rfisnull;
     279             : 
     280             :     /*
     281             :      * FULL means all columns are in the REPLICA IDENTITY, so all columns are
     282             :      * allowed in the row filter and we can skip the validation.
     283             :      */
     284         724 :     if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     285         192 :         return false;
     286             : 
     287             :     /*
     288             :      * For a partition, if pubviaroot is true, find the topmost ancestor that
     289             :      * is published via this publication as we need to use its row filter
     290             :      * expression to filter the partition's changes.
     291             :      *
     292             :      * Note that even though the row filter used is for an ancestor, the
     293             :      * REPLICA IDENTITY used will be for the actual child table.
     294             :      */
     295         532 :     if (pubviaroot && relation->rd_rel->relispartition)
     296             :     {
     297             :         publish_as_relid
     298         108 :             = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
     299             : 
     300         108 :         if (!OidIsValid(publish_as_relid))
     301           6 :             publish_as_relid = relid;
     302             :     }
     303             : 
     304         532 :     rftuple = SearchSysCache2(PUBLICATIONRELMAP,
     305             :                               ObjectIdGetDatum(publish_as_relid),
     306             :                               ObjectIdGetDatum(pubid));
     307             : 
     308         532 :     if (!HeapTupleIsValid(rftuple))
     309          56 :         return false;
     310             : 
     311         476 :     rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
     312             :                               Anum_pg_publication_rel_prqual,
     313             :                               &rfisnull);
     314             : 
     315         476 :     if (!rfisnull)
     316             :     {
     317         102 :         rf_context  context = {0};
     318             :         Node       *rfnode;
     319         102 :         Bitmapset  *bms = NULL;
     320             : 
     321         102 :         context.pubviaroot = pubviaroot;
     322         102 :         context.parentid = publish_as_relid;
     323         102 :         context.relid = relid;
     324             : 
     325             :         /* Remember columns that are part of the REPLICA IDENTITY */
     326         102 :         bms = RelationGetIndexAttrBitmap(relation,
     327             :                                          INDEX_ATTR_BITMAP_IDENTITY_KEY);
     328             : 
     329         102 :         context.bms_replident = bms;
     330         102 :         rfnode = stringToNode(TextDatumGetCString(rfdatum));
     331         102 :         result = contain_invalid_rfcolumn_walker(rfnode, &context);
     332             :     }
     333             : 
     334         476 :     ReleaseSysCache(rftuple);
     335             : 
     336         476 :     return result;
     337             : }
     338             : 
     339             : /*
     340             :  * Check for invalid columns in the publication table definition.
     341             :  *
     342             :  * This function evaluates two conditions:
     343             :  *
     344             :  * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
     345             :  *    by the column list. If any column is missing, *invalid_column_list is set
     346             :  *    to true.
     347             :  * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY
     348             :  *    are published, either by being explicitly named in the column list or, if
     349             :  *    no column list is specified, by setting the option
     350             :  *    publish_generated_columns to stored. If any unpublished
     351             :  *    generated column is found, *invalid_gen_col is set to true.
     352             :  *
     353             :  * Returns true if any of the above conditions are not met.
     354             :  */
     355             : bool
     356         916 : pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
     357             :                             bool pubviaroot, char pubgencols_type,
     358             :                             bool *invalid_column_list,
     359             :                             bool *invalid_gen_col)
     360             : {
     361         916 :     Oid         relid = RelationGetRelid(relation);
     362         916 :     Oid         publish_as_relid = RelationGetRelid(relation);
     363             :     Bitmapset  *idattrs;
     364         916 :     Bitmapset  *columns = NULL;
     365         916 :     TupleDesc   desc = RelationGetDescr(relation);
     366             :     Publication *pub;
     367             :     int         x;
     368             : 
     369         916 :     *invalid_column_list = false;
     370         916 :     *invalid_gen_col = false;
     371             : 
     372             :     /*
     373             :      * For a partition, if pubviaroot is true, find the topmost ancestor that
     374             :      * is published via this publication as we need to use its column list for
     375             :      * the changes.
     376             :      *
     377             :      * Note that even though the column list used is for an ancestor, the
     378             :      * REPLICA IDENTITY used will be for the actual child table.
     379             :      */
     380         916 :     if (pubviaroot && relation->rd_rel->relispartition)
     381             :     {
     382         160 :         publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
     383             : 
     384         160 :         if (!OidIsValid(publish_as_relid))
     385          36 :             publish_as_relid = relid;
     386             :     }
     387             : 
     388             :     /* Fetch the column list */
     389         916 :     pub = GetPublication(pubid);
     390         916 :     check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns);
     391             : 
     392         916 :     if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     393             :     {
     394             :         /* With REPLICA IDENTITY FULL, no column list is allowed. */
     395         206 :         *invalid_column_list = (columns != NULL);
     396             : 
     397             :         /*
     398             :          * As we don't allow a column list with REPLICA IDENTITY FULL, the
     399             :          * publish_generated_columns option must be set to stored if the table
     400             :          * has any stored generated columns.
     401             :          */
     402         206 :         if (pubgencols_type != PUBLISH_GENCOLS_STORED &&
     403         194 :             relation->rd_att->constr &&
     404          82 :             relation->rd_att->constr->has_generated_stored)
     405           6 :             *invalid_gen_col = true;
     406             : 
     407             :         /*
     408             :          * Virtual generated columns are currently not supported for logical
     409             :          * replication at all.
     410             :          */
     411         206 :         if (relation->rd_att->constr &&
     412          94 :             relation->rd_att->constr->has_generated_virtual)
     413          12 :             *invalid_gen_col = true;
     414             : 
     415         206 :         if (*invalid_gen_col && *invalid_column_list)
     416           0 :             return true;
     417             :     }
     418             : 
     419             :     /* Remember columns that are part of the REPLICA IDENTITY */
     420         916 :     idattrs = RelationGetIndexAttrBitmap(relation,
     421             :                                          INDEX_ATTR_BITMAP_IDENTITY_KEY);
     422             : 
     423             :     /*
     424             :      * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
     425             :      * (to handle system columns the usual way), while column list does not
     426             :      * use offset, so we can't do bms_is_subset(). Instead, we have to loop
     427             :      * over the idattrs and check all of them are in the list.
     428             :      */
     429         916 :     x = -1;
     430        1570 :     while ((x = bms_next_member(idattrs, x)) >= 0)
     431             :     {
     432         660 :         AttrNumber  attnum = (x + FirstLowInvalidHeapAttributeNumber);
     433         660 :         Form_pg_attribute att = TupleDescAttr(desc, attnum - 1);
     434             : 
     435         660 :         if (columns == NULL)
     436             :         {
     437             :             /*
     438             :              * The publish_generated_columns option must be set to stored if
     439             :              * the REPLICA IDENTITY contains any stored generated column.
     440             :              */
     441         450 :             if (att->attgenerated == ATTRIBUTE_GENERATED_STORED && pubgencols_type != PUBLISH_GENCOLS_STORED)
     442             :             {
     443           6 :                 *invalid_gen_col = true;
     444           6 :                 break;
     445             :             }
     446             : 
     447             :             /*
     448             :              * The equivalent setting for virtual generated columns does not
     449             :              * exist yet.
     450             :              */
     451         444 :             if (att->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
     452             :             {
     453           0 :                 *invalid_gen_col = true;
     454           0 :                 break;
     455             :             }
     456             : 
     457             :             /* Skip validating the column list since it is not defined */
     458         444 :             continue;
     459             :         }
     460             : 
     461             :         /*
     462             :          * If pubviaroot is true, we are validating the column list of the
     463             :          * parent table, but the bitmap contains the replica identity
     464             :          * information of the child table. The parent/child attnums may not
     465             :          * match, so translate them to the parent - get the attname from the
     466             :          * child, and look it up in the parent.
     467             :          */
     468         210 :         if (pubviaroot)
     469             :         {
     470             :             /* attribute name in the child table */
     471          80 :             char       *colname = get_attname(relid, attnum, false);
     472             : 
     473             :             /*
     474             :              * Determine the attnum for the attribute name in parent (we are
     475             :              * using the column list defined on the parent).
     476             :              */
     477          80 :             attnum = get_attnum(publish_as_relid, colname);
     478             :         }
     479             : 
     480             :         /* replica identity column, not covered by the column list */
     481         210 :         *invalid_column_list |= !bms_is_member(attnum, columns);
     482             : 
     483         210 :         if (*invalid_column_list && *invalid_gen_col)
     484           0 :             break;
     485             :     }
     486             : 
     487         916 :     bms_free(columns);
     488         916 :     bms_free(idattrs);
     489             : 
     490         916 :     return *invalid_column_list || *invalid_gen_col;
     491             : }
     492             : 
     493             : /*
     494             :  * Invalidate entries in the RelationSyncCache for relations included in the
     495             :  * specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA.
     496             :  *
     497             :  * If 'puballtables' is true, invalidate all cache entries.
     498             :  */
     499             : void
     500          36 : InvalidatePubRelSyncCache(Oid pubid, bool puballtables)
     501             : {
     502          36 :     if (puballtables)
     503             :     {
     504           6 :         CacheInvalidateRelSyncAll();
     505             :     }
     506             :     else
     507             :     {
     508          30 :         List       *relids = NIL;
     509          30 :         List       *schemarelids = NIL;
     510             : 
     511             :         /*
     512             :          * For partitioned tables, we must invalidate all partitions and
     513             :          * itself. WAL records for INSERT/UPDATE/DELETE specify leaf tables as
     514             :          * a target. However, WAL records for TRUNCATE specify both a root and
     515             :          * its leaves.
     516             :          */
     517          30 :         relids = GetPublicationRelations(pubid,
     518             :                                          PUBLICATION_PART_ALL);
     519          30 :         schemarelids = GetAllSchemaPublicationRelations(pubid,
     520             :                                                         PUBLICATION_PART_ALL);
     521             : 
     522          30 :         relids = list_concat_unique_oid(relids, schemarelids);
     523             : 
     524             :         /* Invalidate the relsyncache */
     525          66 :         foreach_oid(relid, relids)
     526           6 :             CacheInvalidateRelSync(relid);
     527             :     }
     528             : 
     529          36 :     return;
     530             : }
     531             : 
     532             : /* check_functions_in_node callback */
     533             : static bool
     534         436 : contain_mutable_or_user_functions_checker(Oid func_id, void *context)
     535             : {
     536         436 :     return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
     537             :             func_id >= FirstNormalObjectId);
     538             : }
     539             : 
     540             : /*
     541             :  * The row filter walker checks if the row filter expression is a "simple
     542             :  * expression".
     543             :  *
     544             :  * It allows only simple or compound expressions such as:
     545             :  * - (Var Op Const)
     546             :  * - (Var Op Var)
     547             :  * - (Var Op Const) AND/OR (Var Op Const)
     548             :  * - etc
     549             :  * (where Var is a column of the table this filter belongs to)
     550             :  *
     551             :  * The simple expression has the following restrictions:
     552             :  * - User-defined operators are not allowed;
     553             :  * - User-defined functions are not allowed;
     554             :  * - User-defined types are not allowed;
     555             :  * - User-defined collations are not allowed;
     556             :  * - Non-immutable built-in functions are not allowed;
     557             :  * - System columns are not allowed.
     558             :  *
     559             :  * NOTES
     560             :  *
     561             :  * We don't allow user-defined functions/operators/types/collations because
     562             :  * (a) if a user drops a user-defined object used in a row filter expression or
     563             :  * if there is any other error while using it, the logical decoding
     564             :  * infrastructure won't be able to recover from such an error even if the
     565             :  * object is recreated again because a historic snapshot is used to evaluate
     566             :  * the row filter;
     567             :  * (b) a user-defined function can be used to access tables that could have
     568             :  * unpleasant results because a historic snapshot is used. That's why only
     569             :  * immutable built-in functions are allowed in row filter expressions.
     570             :  *
     571             :  * We don't allow system columns because currently, we don't have that
     572             :  * information in the tuple passed to downstream. Also, as we don't replicate
     573             :  * those to subscribers, there doesn't seem to be a need for a filter on those
     574             :  * columns.
     575             :  *
     576             :  * We can allow other node types after more analysis and testing.
     577             :  */
     578             : static bool
     579        1440 : check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
     580             : {
     581        1440 :     char       *errdetail_msg = NULL;
     582             : 
     583        1440 :     if (node == NULL)
     584           6 :         return false;
     585             : 
     586        1434 :     switch (nodeTag(node))
     587             :     {
     588         388 :         case T_Var:
     589             :             /* System columns are not allowed. */
     590         388 :             if (((Var *) node)->varattno < InvalidAttrNumber)
     591           6 :                 errdetail_msg = _("System columns are not allowed.");
     592         388 :             break;
     593         392 :         case T_OpExpr:
     594             :         case T_DistinctExpr:
     595             :         case T_NullIfExpr:
     596             :             /* OK, except user-defined operators are not allowed. */
     597         392 :             if (((OpExpr *) node)->opno >= FirstNormalObjectId)
     598           6 :                 errdetail_msg = _("User-defined operators are not allowed.");
     599         392 :             break;
     600           6 :         case T_ScalarArrayOpExpr:
     601             :             /* OK, except user-defined operators are not allowed. */
     602           6 :             if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
     603           0 :                 errdetail_msg = _("User-defined operators are not allowed.");
     604             : 
     605             :             /*
     606             :              * We don't need to check the hashfuncid and negfuncid of
     607             :              * ScalarArrayOpExpr as those functions are only built for a
     608             :              * subquery.
     609             :              */
     610           6 :             break;
     611           6 :         case T_RowCompareExpr:
     612             :             {
     613             :                 ListCell   *opid;
     614             : 
     615             :                 /* OK, except user-defined operators are not allowed. */
     616          18 :                 foreach(opid, ((RowCompareExpr *) node)->opnos)
     617             :                 {
     618          12 :                     if (lfirst_oid(opid) >= FirstNormalObjectId)
     619             :                     {
     620           0 :                         errdetail_msg = _("User-defined operators are not allowed.");
     621           0 :                         break;
     622             :                     }
     623             :                 }
     624             :             }
     625           6 :             break;
     626         636 :         case T_Const:
     627             :         case T_FuncExpr:
     628             :         case T_BoolExpr:
     629             :         case T_RelabelType:
     630             :         case T_CollateExpr:
     631             :         case T_CaseExpr:
     632             :         case T_CaseTestExpr:
     633             :         case T_ArrayExpr:
     634             :         case T_RowExpr:
     635             :         case T_CoalesceExpr:
     636             :         case T_MinMaxExpr:
     637             :         case T_XmlExpr:
     638             :         case T_NullTest:
     639             :         case T_BooleanTest:
     640             :         case T_List:
     641             :             /* OK, supported */
     642         636 :             break;
     643           6 :         default:
     644           6 :             errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
     645           6 :             break;
     646             :     }
     647             : 
     648             :     /*
     649             :      * For all the supported nodes, if we haven't already found a problem,
     650             :      * check the types, functions, and collations used in it.  We check List
     651             :      * by walking through each element.
     652             :      */
     653        1434 :     if (!errdetail_msg && !IsA(node, List))
     654             :     {
     655        1362 :         if (exprType(node) >= FirstNormalObjectId)
     656           6 :             errdetail_msg = _("User-defined types are not allowed.");
     657        1356 :         else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
     658             :                                          pstate))
     659          12 :             errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
     660        2688 :         else if (exprCollation(node) >= FirstNormalObjectId ||
     661        1344 :                  exprInputCollation(node) >= FirstNormalObjectId)
     662           6 :             errdetail_msg = _("User-defined collations are not allowed.");
     663             :     }
     664             : 
     665             :     /*
     666             :      * If we found a problem in this node, throw error now. Otherwise keep
     667             :      * going.
     668             :      */
     669        1434 :     if (errdetail_msg)
     670          42 :         ereport(ERROR,
     671             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     672             :                  errmsg("invalid publication WHERE expression"),
     673             :                  errdetail_internal("%s", errdetail_msg),
     674             :                  parser_errposition(pstate, exprLocation(node))));
     675             : 
     676        1392 :     return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
     677             :                                   pstate);
     678             : }
     679             : 
     680             : /*
     681             :  * Check if the row filter expression is a "simple expression".
     682             :  *
     683             :  * See check_simple_rowfilter_expr_walker for details.
     684             :  */
     685             : static bool
     686         376 : check_simple_rowfilter_expr(Node *node, ParseState *pstate)
     687             : {
     688         376 :     return check_simple_rowfilter_expr_walker(node, pstate);
     689             : }
     690             : 
     691             : /*
     692             :  * Transform the publication WHERE expression for all the relations in the list,
     693             :  * ensuring it is coerced to boolean and necessary collation information is
     694             :  * added if required, and add a new nsitem/RTE for the associated relation to
     695             :  * the ParseState's namespace list.
     696             :  *
     697             :  * Also check the publication row filter expression and throw an error if
     698             :  * anything not permitted or unexpected is encountered.
     699             :  */
     700             : static void
     701        1182 : TransformPubWhereClauses(List *tables, const char *queryString,
     702             :                          bool pubviaroot)
     703             : {
     704             :     ListCell   *lc;
     705             : 
     706        2378 :     foreach(lc, tables)
     707             :     {
     708             :         ParseNamespaceItem *nsitem;
     709        1256 :         Node       *whereclause = NULL;
     710             :         ParseState *pstate;
     711        1256 :         PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
     712             : 
     713        1256 :         if (pri->whereClause == NULL)
     714         862 :             continue;
     715             : 
     716             :         /*
     717             :          * If the publication doesn't publish changes via the root partitioned
     718             :          * table, the partition's row filter will be used. So disallow using
     719             :          * WHERE clause on partitioned table in this case.
     720             :          */
     721         394 :         if (!pubviaroot &&
     722         364 :             pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     723           6 :             ereport(ERROR,
     724             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     725             :                      errmsg("cannot use publication WHERE clause for relation \"%s\"",
     726             :                             RelationGetRelationName(pri->relation)),
     727             :                      errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
     728             :                                "publish_via_partition_root")));
     729             : 
     730             :         /*
     731             :          * A fresh pstate is required so that we only have "this" table in its
     732             :          * rangetable
     733             :          */
     734         388 :         pstate = make_parsestate(NULL);
     735         388 :         pstate->p_sourcetext = queryString;
     736         388 :         nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
     737             :                                                AccessShareLock, NULL,
     738             :                                                false, false);
     739         388 :         addNSItemToQuery(pstate, nsitem, false, true, true);
     740             : 
     741         388 :         whereclause = transformWhereClause(pstate,
     742         388 :                                            copyObject(pri->whereClause),
     743             :                                            EXPR_KIND_WHERE,
     744             :                                            "PUBLICATION WHERE");
     745             : 
     746             :         /* Fix up collation information */
     747         376 :         assign_expr_collations(pstate, whereclause);
     748             : 
     749         376 :         whereclause = expand_generated_columns_in_expr(whereclause, pri->relation, 1);
     750             : 
     751             :         /*
     752             :          * We allow only simple expressions in row filters. See
     753             :          * check_simple_rowfilter_expr_walker.
     754             :          */
     755         376 :         check_simple_rowfilter_expr(whereclause, pstate);
     756             : 
     757         334 :         free_parsestate(pstate);
     758             : 
     759         334 :         pri->whereClause = whereclause;
     760             :     }
     761        1122 : }
     762             : 
     763             : 
     764             : /*
     765             :  * Given a list of tables that are going to be added to a publication,
     766             :  * verify that they fulfill the necessary preconditions, namely: no tables
     767             :  * have a column list if any schema is published; and partitioned tables do
     768             :  * not have column lists if publish_via_partition_root is not set.
     769             :  *
     770             :  * 'publish_schema' indicates that the publication contains any TABLES IN
     771             :  * SCHEMA elements (newly added in this command, or preexisting).
     772             :  * 'pubviaroot' is the value of publish_via_partition_root.
     773             :  */
     774             : static void
     775        1122 : CheckPubRelationColumnList(char *pubname, List *tables,
     776             :                            bool publish_schema, bool pubviaroot)
     777             : {
     778             :     ListCell   *lc;
     779             : 
     780        2288 :     foreach(lc, tables)
     781             :     {
     782        1196 :         PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
     783             : 
     784        1196 :         if (pri->columns == NIL)
     785         792 :             continue;
     786             : 
     787             :         /*
     788             :          * Disallow specifying column list if any schema is in the
     789             :          * publication.
     790             :          *
     791             :          * XXX We could instead just forbid the case when the publication
     792             :          * tries to publish the table with a column list and a schema for that
     793             :          * table. However, if we do that then we need a restriction during
     794             :          * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
     795             :          * seem to be a good idea.
     796             :          */
     797         404 :         if (publish_schema)
     798          24 :             ereport(ERROR,
     799             :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     800             :                     errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
     801             :                            get_namespace_name(RelationGetNamespace(pri->relation)),
     802             :                            RelationGetRelationName(pri->relation), pubname),
     803             :                     errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
     804             : 
     805             :         /*
     806             :          * If the publication doesn't publish changes via the root partitioned
     807             :          * table, the partition's column list will be used. So disallow using
     808             :          * a column list on the partitioned table in this case.
     809             :          */
     810         380 :         if (!pubviaroot &&
     811         304 :             pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     812           6 :             ereport(ERROR,
     813             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     814             :                      errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
     815             :                             get_namespace_name(RelationGetNamespace(pri->relation)),
     816             :                             RelationGetRelationName(pri->relation), pubname),
     817             :                      errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
     818             :                                "publish_via_partition_root")));
     819             :     }
     820        1092 : }
     821             : 
     822             : /*
     823             :  * Create new publication.
     824             :  */
     825             : ObjectAddress
     826         856 : CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
     827             : {
     828             :     Relation    rel;
     829             :     ObjectAddress myself;
     830             :     Oid         puboid;
     831             :     bool        nulls[Natts_pg_publication];
     832             :     Datum       values[Natts_pg_publication];
     833             :     HeapTuple   tup;
     834             :     bool        publish_given;
     835             :     PublicationActions pubactions;
     836             :     bool        publish_via_partition_root_given;
     837             :     bool        publish_via_partition_root;
     838             :     bool        publish_generated_columns_given;
     839             :     char        publish_generated_columns;
     840             :     AclResult   aclresult;
     841         856 :     List       *relations = NIL;
     842         856 :     List       *schemaidlist = NIL;
     843             : 
     844             :     /* must have CREATE privilege on database */
     845         856 :     aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
     846         856 :     if (aclresult != ACLCHECK_OK)
     847           6 :         aclcheck_error(aclresult, OBJECT_DATABASE,
     848           6 :                        get_database_name(MyDatabaseId));
     849             : 
     850             :     /* FOR ALL TABLES requires superuser */
     851         850 :     if (stmt->for_all_tables && !superuser())
     852           0 :         ereport(ERROR,
     853             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     854             :                  errmsg("must be superuser to create FOR ALL TABLES publication")));
     855             : 
     856         850 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
     857             : 
     858             :     /* Check if name is used */
     859         850 :     puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
     860             :                              CStringGetDatum(stmt->pubname));
     861         850 :     if (OidIsValid(puboid))
     862           6 :         ereport(ERROR,
     863             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     864             :                  errmsg("publication \"%s\" already exists",
     865             :                         stmt->pubname)));
     866             : 
     867             :     /* Form a tuple. */
     868         844 :     memset(values, 0, sizeof(values));
     869         844 :     memset(nulls, false, sizeof(nulls));
     870             : 
     871         844 :     values[Anum_pg_publication_pubname - 1] =
     872         844 :         DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
     873         844 :     values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
     874             : 
     875         844 :     parse_publication_options(pstate,
     876             :                               stmt->options,
     877             :                               &publish_given, &pubactions,
     878             :                               &publish_via_partition_root_given,
     879             :                               &publish_via_partition_root,
     880             :                               &publish_generated_columns_given,
     881             :                               &publish_generated_columns);
     882             : 
     883         808 :     puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
     884             :                                 Anum_pg_publication_oid);
     885         808 :     values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
     886         808 :     values[Anum_pg_publication_puballtables - 1] =
     887         808 :         BoolGetDatum(stmt->for_all_tables);
     888         808 :     values[Anum_pg_publication_pubinsert - 1] =
     889         808 :         BoolGetDatum(pubactions.pubinsert);
     890         808 :     values[Anum_pg_publication_pubupdate - 1] =
     891         808 :         BoolGetDatum(pubactions.pubupdate);
     892         808 :     values[Anum_pg_publication_pubdelete - 1] =
     893         808 :         BoolGetDatum(pubactions.pubdelete);
     894         808 :     values[Anum_pg_publication_pubtruncate - 1] =
     895         808 :         BoolGetDatum(pubactions.pubtruncate);
     896         808 :     values[Anum_pg_publication_pubviaroot - 1] =
     897         808 :         BoolGetDatum(publish_via_partition_root);
     898         808 :     values[Anum_pg_publication_pubgencols - 1] =
     899         808 :         CharGetDatum(publish_generated_columns);
     900             : 
     901         808 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     902             : 
     903             :     /* Insert tuple into catalog. */
     904         808 :     CatalogTupleInsert(rel, tup);
     905         808 :     heap_freetuple(tup);
     906             : 
     907         808 :     recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
     908             : 
     909         808 :     ObjectAddressSet(myself, PublicationRelationId, puboid);
     910             : 
     911             :     /* Make the changes visible. */
     912         808 :     CommandCounterIncrement();
     913             : 
     914             :     /* Associate objects with the publication. */
     915         808 :     if (stmt->for_all_tables)
     916             :     {
     917             :         /* Invalidate relcache so that publication info is rebuilt. */
     918          88 :         CacheInvalidateRelcacheAll();
     919             :     }
     920             :     else
     921             :     {
     922         720 :         ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
     923             :                                    &schemaidlist);
     924             : 
     925             :         /* FOR TABLES IN SCHEMA requires superuser */
     926         702 :         if (schemaidlist != NIL && !superuser())
     927           6 :             ereport(ERROR,
     928             :                     errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     929             :                     errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
     930             : 
     931         696 :         if (relations != NIL)
     932             :         {
     933             :             List       *rels;
     934             : 
     935         472 :             rels = OpenTableList(relations);
     936         442 :             TransformPubWhereClauses(rels, pstate->p_sourcetext,
     937             :                                      publish_via_partition_root);
     938             : 
     939         418 :             CheckPubRelationColumnList(stmt->pubname, rels,
     940             :                                        schemaidlist != NIL,
     941             :                                        publish_via_partition_root);
     942             : 
     943         412 :             PublicationAddTables(puboid, rels, true, NULL);
     944         386 :             CloseTableList(rels);
     945             :         }
     946             : 
     947         610 :         if (schemaidlist != NIL)
     948             :         {
     949             :             /*
     950             :              * Schema lock is held until the publication is created to prevent
     951             :              * concurrent schema deletion.
     952             :              */
     953         152 :             LockSchemaList(schemaidlist);
     954         152 :             PublicationAddSchemas(puboid, schemaidlist, true, NULL);
     955             :         }
     956             :     }
     957             : 
     958         692 :     table_close(rel, RowExclusiveLock);
     959             : 
     960         692 :     InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
     961             : 
     962         692 :     if (wal_level != WAL_LEVEL_LOGICAL)
     963         402 :         ereport(WARNING,
     964             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     965             :                  errmsg("\"wal_level\" is insufficient to publish logical changes"),
     966             :                  errhint("Set \"wal_level\" to \"logical\" before creating subscriptions.")));
     967             : 
     968         692 :     return myself;
     969             : }
     970             : 
     971             : /*
     972             :  * Change options of a publication.
     973             :  */
     974             : static void
     975         116 : AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
     976             :                         Relation rel, HeapTuple tup)
     977             : {
     978             :     bool        nulls[Natts_pg_publication];
     979             :     bool        replaces[Natts_pg_publication];
     980             :     Datum       values[Natts_pg_publication];
     981             :     bool        publish_given;
     982             :     PublicationActions pubactions;
     983             :     bool        publish_via_partition_root_given;
     984             :     bool        publish_via_partition_root;
     985             :     bool        publish_generated_columns_given;
     986             :     char        publish_generated_columns;
     987             :     ObjectAddress obj;
     988             :     Form_pg_publication pubform;
     989         116 :     List       *root_relids = NIL;
     990             :     ListCell   *lc;
     991             : 
     992         116 :     parse_publication_options(pstate,
     993             :                               stmt->options,
     994             :                               &publish_given, &pubactions,
     995             :                               &publish_via_partition_root_given,
     996             :                               &publish_via_partition_root,
     997             :                               &publish_generated_columns_given,
     998             :                               &publish_generated_columns);
     999             : 
    1000         116 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1001             : 
    1002             :     /*
    1003             :      * If the publication doesn't publish changes via the root partitioned
    1004             :      * table, the partition's row filter and column list will be used. So
    1005             :      * disallow using WHERE clause and column lists on partitioned table in
    1006             :      * this case.
    1007             :      */
    1008         116 :     if (!pubform->puballtables && publish_via_partition_root_given &&
    1009          80 :         !publish_via_partition_root)
    1010             :     {
    1011             :         /*
    1012             :          * Lock the publication so nobody else can do anything with it. This
    1013             :          * prevents concurrent alter to add partitioned table(s) with WHERE
    1014             :          * clause(s) and/or column lists which we don't allow when not
    1015             :          * publishing via root.
    1016             :          */
    1017          48 :         LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
    1018             :                            AccessShareLock);
    1019             : 
    1020          48 :         root_relids = GetPublicationRelations(pubform->oid,
    1021             :                                               PUBLICATION_PART_ROOT);
    1022             : 
    1023          84 :         foreach(lc, root_relids)
    1024             :         {
    1025          48 :             Oid         relid = lfirst_oid(lc);
    1026             :             HeapTuple   rftuple;
    1027             :             char        relkind;
    1028             :             char       *relname;
    1029             :             bool        has_rowfilter;
    1030             :             bool        has_collist;
    1031             : 
    1032             :             /*
    1033             :              * Beware: we don't have lock on the relations, so cope silently
    1034             :              * with the cache lookups returning NULL.
    1035             :              */
    1036             : 
    1037          48 :             rftuple = SearchSysCache2(PUBLICATIONRELMAP,
    1038             :                                       ObjectIdGetDatum(relid),
    1039             :                                       ObjectIdGetDatum(pubform->oid));
    1040          48 :             if (!HeapTupleIsValid(rftuple))
    1041           0 :                 continue;
    1042          48 :             has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
    1043          48 :             has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
    1044          48 :             if (!has_rowfilter && !has_collist)
    1045             :             {
    1046          12 :                 ReleaseSysCache(rftuple);
    1047          12 :                 continue;
    1048             :             }
    1049             : 
    1050          36 :             relkind = get_rel_relkind(relid);
    1051          36 :             if (relkind != RELKIND_PARTITIONED_TABLE)
    1052             :             {
    1053          24 :                 ReleaseSysCache(rftuple);
    1054          24 :                 continue;
    1055             :             }
    1056          12 :             relname = get_rel_name(relid);
    1057          12 :             if (relname == NULL)    /* table concurrently dropped */
    1058             :             {
    1059           0 :                 ReleaseSysCache(rftuple);
    1060           0 :                 continue;
    1061             :             }
    1062             : 
    1063          12 :             if (has_rowfilter)
    1064           6 :                 ereport(ERROR,
    1065             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1066             :                          errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
    1067             :                                 "publish_via_partition_root",
    1068             :                                 stmt->pubname),
    1069             :                          errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
    1070             :                                    relname, "publish_via_partition_root")));
    1071             :             Assert(has_collist);
    1072           6 :             ereport(ERROR,
    1073             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1074             :                      errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
    1075             :                             "publish_via_partition_root",
    1076             :                             stmt->pubname),
    1077             :                      errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
    1078             :                                relname, "publish_via_partition_root")));
    1079             :         }
    1080             :     }
    1081             : 
    1082             :     /* Everything ok, form a new tuple. */
    1083         104 :     memset(values, 0, sizeof(values));
    1084         104 :     memset(nulls, false, sizeof(nulls));
    1085         104 :     memset(replaces, false, sizeof(replaces));
    1086             : 
    1087         104 :     if (publish_given)
    1088             :     {
    1089          28 :         values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
    1090          28 :         replaces[Anum_pg_publication_pubinsert - 1] = true;
    1091             : 
    1092          28 :         values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
    1093          28 :         replaces[Anum_pg_publication_pubupdate - 1] = true;
    1094             : 
    1095          28 :         values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
    1096          28 :         replaces[Anum_pg_publication_pubdelete - 1] = true;
    1097             : 
    1098          28 :         values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
    1099          28 :         replaces[Anum_pg_publication_pubtruncate - 1] = true;
    1100             :     }
    1101             : 
    1102         104 :     if (publish_via_partition_root_given)
    1103             :     {
    1104          70 :         values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
    1105          70 :         replaces[Anum_pg_publication_pubviaroot - 1] = true;
    1106             :     }
    1107             : 
    1108         104 :     if (publish_generated_columns_given)
    1109             :     {
    1110           6 :         values[Anum_pg_publication_pubgencols - 1] = CharGetDatum(publish_generated_columns);
    1111           6 :         replaces[Anum_pg_publication_pubgencols - 1] = true;
    1112             :     }
    1113             : 
    1114         104 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
    1115             :                             replaces);
    1116             : 
    1117             :     /* Update the catalog. */
    1118         104 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    1119             : 
    1120         104 :     CommandCounterIncrement();
    1121             : 
    1122         104 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1123             : 
    1124             :     /* Invalidate the relcache. */
    1125         104 :     if (pubform->puballtables)
    1126             :     {
    1127           8 :         CacheInvalidateRelcacheAll();
    1128             :     }
    1129             :     else
    1130             :     {
    1131          96 :         List       *relids = NIL;
    1132          96 :         List       *schemarelids = NIL;
    1133             : 
    1134             :         /*
    1135             :          * For any partitioned tables contained in the publication, we must
    1136             :          * invalidate all partitions contained in the respective partition
    1137             :          * trees, not just those explicitly mentioned in the publication.
    1138             :          */
    1139          96 :         if (root_relids == NIL)
    1140          60 :             relids = GetPublicationRelations(pubform->oid,
    1141             :                                              PUBLICATION_PART_ALL);
    1142             :         else
    1143             :         {
    1144             :             /*
    1145             :              * We already got tables explicitly mentioned in the publication.
    1146             :              * Now get all partitions for the partitioned table in the list.
    1147             :              */
    1148          72 :             foreach(lc, root_relids)
    1149          36 :                 relids = GetPubPartitionOptionRelations(relids,
    1150             :                                                         PUBLICATION_PART_ALL,
    1151             :                                                         lfirst_oid(lc));
    1152             :         }
    1153             : 
    1154          96 :         schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
    1155             :                                                         PUBLICATION_PART_ALL);
    1156          96 :         relids = list_concat_unique_oid(relids, schemarelids);
    1157             : 
    1158          96 :         InvalidatePublicationRels(relids);
    1159             :     }
    1160             : 
    1161         104 :     ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
    1162         104 :     EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
    1163             :                                      (Node *) stmt);
    1164             : 
    1165         104 :     InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
    1166         104 : }
    1167             : 
    1168             : /*
    1169             :  * Invalidate the relations.
    1170             :  */
    1171             : void
    1172        2440 : InvalidatePublicationRels(List *relids)
    1173             : {
    1174             :     /*
    1175             :      * We don't want to send too many individual messages, at some point it's
    1176             :      * cheaper to just reset whole relcache.
    1177             :      */
    1178        2440 :     if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
    1179             :     {
    1180             :         ListCell   *lc;
    1181             : 
    1182       20584 :         foreach(lc, relids)
    1183       18144 :             CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
    1184             :     }
    1185             :     else
    1186           0 :         CacheInvalidateRelcacheAll();
    1187        2440 : }
    1188             : 
    1189             : /*
    1190             :  * Add or remove table to/from publication.
    1191             :  */
    1192             : static void
    1193         914 : AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
    1194             :                        List *tables, const char *queryString,
    1195             :                        bool publish_schema)
    1196             : {
    1197         914 :     List       *rels = NIL;
    1198         914 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
    1199         914 :     Oid         pubid = pubform->oid;
    1200             : 
    1201             :     /*
    1202             :      * Nothing to do if no objects, except in SET: for that it is quite
    1203             :      * possible that user has not specified any tables in which case we need
    1204             :      * to remove all the existing tables.
    1205             :      */
    1206         914 :     if (!tables && stmt->action != AP_SetObjects)
    1207          76 :         return;
    1208             : 
    1209         838 :     rels = OpenTableList(tables);
    1210             : 
    1211         838 :     if (stmt->action == AP_AddObjects)
    1212             :     {
    1213         294 :         TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
    1214             : 
    1215         276 :         publish_schema |= is_schema_publication(pubid);
    1216             : 
    1217         276 :         CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
    1218         276 :                                    pubform->pubviaroot);
    1219             : 
    1220         264 :         PublicationAddTables(pubid, rels, false, stmt);
    1221             :     }
    1222         544 :     else if (stmt->action == AP_DropObjects)
    1223          98 :         PublicationDropTables(pubid, rels, false);
    1224             :     else                        /* AP_SetObjects */
    1225             :     {
    1226         446 :         List       *oldrelids = GetPublicationRelations(pubid,
    1227             :                                                         PUBLICATION_PART_ROOT);
    1228         446 :         List       *delrels = NIL;
    1229             :         ListCell   *oldlc;
    1230             : 
    1231         446 :         TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
    1232             : 
    1233         428 :         CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
    1234         428 :                                    pubform->pubviaroot);
    1235             : 
    1236             :         /*
    1237             :          * To recreate the relation list for the publication, look for
    1238             :          * existing relations that do not need to be dropped.
    1239             :          */
    1240         806 :         foreach(oldlc, oldrelids)
    1241             :         {
    1242         402 :             Oid         oldrelid = lfirst_oid(oldlc);
    1243             :             ListCell   *newlc;
    1244             :             PublicationRelInfo *oldrel;
    1245         402 :             bool        found = false;
    1246             :             HeapTuple   rftuple;
    1247         402 :             Node       *oldrelwhereclause = NULL;
    1248         402 :             Bitmapset  *oldcolumns = NULL;
    1249             : 
    1250             :             /* look up the cache for the old relmap */
    1251         402 :             rftuple = SearchSysCache2(PUBLICATIONRELMAP,
    1252             :                                       ObjectIdGetDatum(oldrelid),
    1253             :                                       ObjectIdGetDatum(pubid));
    1254             : 
    1255             :             /*
    1256             :              * See if the existing relation currently has a WHERE clause or a
    1257             :              * column list. We need to compare those too.
    1258             :              */
    1259         402 :             if (HeapTupleIsValid(rftuple))
    1260             :             {
    1261         402 :                 bool        isnull = true;
    1262             :                 Datum       whereClauseDatum;
    1263             :                 Datum       columnListDatum;
    1264             : 
    1265             :                 /* Load the WHERE clause for this table. */
    1266         402 :                 whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
    1267             :                                                    Anum_pg_publication_rel_prqual,
    1268             :                                                    &isnull);
    1269         402 :                 if (!isnull)
    1270         210 :                     oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
    1271             : 
    1272             :                 /* Transform the int2vector column list to a bitmap. */
    1273         402 :                 columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
    1274             :                                                   Anum_pg_publication_rel_prattrs,
    1275             :                                                   &isnull);
    1276             : 
    1277         402 :                 if (!isnull)
    1278         136 :                     oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
    1279             : 
    1280         402 :                 ReleaseSysCache(rftuple);
    1281             :             }
    1282             : 
    1283         778 :             foreach(newlc, rels)
    1284             :             {
    1285             :                 PublicationRelInfo *newpubrel;
    1286             :                 Oid         newrelid;
    1287         404 :                 Bitmapset  *newcolumns = NULL;
    1288             : 
    1289         404 :                 newpubrel = (PublicationRelInfo *) lfirst(newlc);
    1290         404 :                 newrelid = RelationGetRelid(newpubrel->relation);
    1291             : 
    1292             :                 /*
    1293             :                  * Validate the column list.  If the column list or WHERE
    1294             :                  * clause changes, then the validation done here will be
    1295             :                  * duplicated inside PublicationAddTables().  The validation
    1296             :                  * is cheap enough that that seems harmless.
    1297             :                  */
    1298         404 :                 newcolumns = pub_collist_validate(newpubrel->relation,
    1299             :                                                   newpubrel->columns);
    1300             : 
    1301             :                 /*
    1302             :                  * Check if any of the new set of relations matches with the
    1303             :                  * existing relations in the publication. Additionally, if the
    1304             :                  * relation has an associated WHERE clause, check the WHERE
    1305             :                  * expressions also match. Same for the column list. Drop the
    1306             :                  * rest.
    1307             :                  */
    1308         392 :                 if (newrelid == oldrelid)
    1309             :                 {
    1310         284 :                     if (equal(oldrelwhereclause, newpubrel->whereClause) &&
    1311          80 :                         bms_equal(oldcolumns, newcolumns))
    1312             :                     {
    1313          16 :                         found = true;
    1314          16 :                         break;
    1315             :                     }
    1316             :                 }
    1317             :             }
    1318             : 
    1319             :             /*
    1320             :              * Add the non-matched relations to a list so that they can be
    1321             :              * dropped.
    1322             :              */
    1323         390 :             if (!found)
    1324             :             {
    1325         374 :                 oldrel = palloc(sizeof(PublicationRelInfo));
    1326         374 :                 oldrel->whereClause = NULL;
    1327         374 :                 oldrel->columns = NIL;
    1328         374 :                 oldrel->relation = table_open(oldrelid,
    1329             :                                               ShareUpdateExclusiveLock);
    1330         374 :                 delrels = lappend(delrels, oldrel);
    1331             :             }
    1332             :         }
    1333             : 
    1334             :         /* And drop them. */
    1335         404 :         PublicationDropTables(pubid, delrels, true);
    1336             : 
    1337             :         /*
    1338             :          * Don't bother calculating the difference for adding, we'll catch and
    1339             :          * skip existing ones when doing catalog update.
    1340             :          */
    1341         404 :         PublicationAddTables(pubid, rels, true, stmt);
    1342             : 
    1343         404 :         CloseTableList(delrels);
    1344             :     }
    1345             : 
    1346         706 :     CloseTableList(rels);
    1347             : }
    1348             : 
    1349             : /*
    1350             :  * Alter the publication schemas.
    1351             :  *
    1352             :  * Add or remove schemas to/from publication.
    1353             :  */
    1354             : static void
    1355         782 : AlterPublicationSchemas(AlterPublicationStmt *stmt,
    1356             :                         HeapTuple tup, List *schemaidlist)
    1357             : {
    1358         782 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
    1359             : 
    1360             :     /*
    1361             :      * Nothing to do if no objects, except in SET: for that it is quite
    1362             :      * possible that user has not specified any schemas in which case we need
    1363             :      * to remove all the existing schemas.
    1364             :      */
    1365         782 :     if (!schemaidlist && stmt->action != AP_SetObjects)
    1366         302 :         return;
    1367             : 
    1368             :     /*
    1369             :      * Schema lock is held until the publication is altered to prevent
    1370             :      * concurrent schema deletion.
    1371             :      */
    1372         480 :     LockSchemaList(schemaidlist);
    1373         480 :     if (stmt->action == AP_AddObjects)
    1374             :     {
    1375             :         ListCell   *lc;
    1376             :         List       *reloids;
    1377             : 
    1378          38 :         reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
    1379             : 
    1380          54 :         foreach(lc, reloids)
    1381             :         {
    1382             :             HeapTuple   coltuple;
    1383             : 
    1384          22 :             coltuple = SearchSysCache2(PUBLICATIONRELMAP,
    1385             :                                        ObjectIdGetDatum(lfirst_oid(lc)),
    1386             :                                        ObjectIdGetDatum(pubform->oid));
    1387             : 
    1388          22 :             if (!HeapTupleIsValid(coltuple))
    1389           0 :                 continue;
    1390             : 
    1391             :             /*
    1392             :              * Disallow adding schema if column list is already part of the
    1393             :              * publication. See CheckPubRelationColumnList.
    1394             :              */
    1395          22 :             if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
    1396           6 :                 ereport(ERROR,
    1397             :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1398             :                         errmsg("cannot add schema to publication \"%s\"",
    1399             :                                stmt->pubname),
    1400             :                         errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
    1401             : 
    1402          16 :             ReleaseSysCache(coltuple);
    1403             :         }
    1404             : 
    1405          32 :         PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
    1406             :     }
    1407         442 :     else if (stmt->action == AP_DropObjects)
    1408          38 :         PublicationDropSchemas(pubform->oid, schemaidlist, false);
    1409             :     else                        /* AP_SetObjects */
    1410             :     {
    1411         404 :         List       *oldschemaids = GetPublicationSchemas(pubform->oid);
    1412         404 :         List       *delschemas = NIL;
    1413             : 
    1414             :         /* Identify which schemas should be dropped */
    1415         404 :         delschemas = list_difference_oid(oldschemaids, schemaidlist);
    1416             : 
    1417             :         /*
    1418             :          * Schema lock is held until the publication is altered to prevent
    1419             :          * concurrent schema deletion.
    1420             :          */
    1421         404 :         LockSchemaList(delschemas);
    1422             : 
    1423             :         /* And drop them */
    1424         404 :         PublicationDropSchemas(pubform->oid, delschemas, true);
    1425             : 
    1426             :         /*
    1427             :          * Don't bother calculating the difference for adding, we'll catch and
    1428             :          * skip existing ones when doing catalog update.
    1429             :          */
    1430         404 :         PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
    1431             :     }
    1432             : }
    1433             : 
    1434             : /*
    1435             :  * Check if relations and schemas can be in a given publication and throw
    1436             :  * appropriate error if not.
    1437             :  */
    1438             : static void
    1439         956 : CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
    1440             :                       List *tables, List *schemaidlist)
    1441             : {
    1442         956 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
    1443             : 
    1444         956 :     if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
    1445         104 :         schemaidlist && !superuser())
    1446           6 :         ereport(ERROR,
    1447             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1448             :                  errmsg("must be superuser to add or set schemas")));
    1449             : 
    1450             :     /*
    1451             :      * Check that user is allowed to manipulate the publication tables in
    1452             :      * schema
    1453             :      */
    1454         950 :     if (schemaidlist && pubform->puballtables)
    1455          18 :         ereport(ERROR,
    1456             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1457             :                  errmsg("publication \"%s\" is defined as FOR ALL TABLES",
    1458             :                         NameStr(pubform->pubname)),
    1459             :                  errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications.")));
    1460             : 
    1461             :     /* Check that user is allowed to manipulate the publication tables. */
    1462         932 :     if (tables && pubform->puballtables)
    1463          18 :         ereport(ERROR,
    1464             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1465             :                  errmsg("publication \"%s\" is defined as FOR ALL TABLES",
    1466             :                         NameStr(pubform->pubname)),
    1467             :                  errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
    1468         914 : }
    1469             : 
    1470             : /*
    1471             :  * Alter the existing publication.
    1472             :  *
    1473             :  * This is dispatcher function for AlterPublicationOptions,
    1474             :  * AlterPublicationSchemas and AlterPublicationTables.
    1475             :  */
    1476             : void
    1477        1090 : AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
    1478             : {
    1479             :     Relation    rel;
    1480             :     HeapTuple   tup;
    1481             :     Form_pg_publication pubform;
    1482             : 
    1483        1090 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    1484             : 
    1485        1090 :     tup = SearchSysCacheCopy1(PUBLICATIONNAME,
    1486             :                               CStringGetDatum(stmt->pubname));
    1487             : 
    1488        1090 :     if (!HeapTupleIsValid(tup))
    1489           0 :         ereport(ERROR,
    1490             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    1491             :                  errmsg("publication \"%s\" does not exist",
    1492             :                         stmt->pubname)));
    1493             : 
    1494        1090 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1495             : 
    1496             :     /* must be owner */
    1497        1090 :     if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
    1498           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
    1499           0 :                        stmt->pubname);
    1500             : 
    1501        1090 :     if (stmt->options)
    1502         116 :         AlterPublicationOptions(pstate, stmt, rel, tup);
    1503             :     else
    1504             :     {
    1505         974 :         List       *relations = NIL;
    1506         974 :         List       *schemaidlist = NIL;
    1507         974 :         Oid         pubid = pubform->oid;
    1508             : 
    1509         974 :         ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
    1510             :                                    &schemaidlist);
    1511             : 
    1512         956 :         CheckAlterPublication(stmt, tup, relations, schemaidlist);
    1513             : 
    1514         914 :         heap_freetuple(tup);
    1515             : 
    1516             :         /* Lock the publication so nobody else can do anything with it. */
    1517         914 :         LockDatabaseObject(PublicationRelationId, pubid, 0,
    1518             :                            AccessExclusiveLock);
    1519             : 
    1520             :         /*
    1521             :          * It is possible that by the time we acquire the lock on publication,
    1522             :          * concurrent DDL has removed it. We can test this by checking the
    1523             :          * existence of publication. We get the tuple again to avoid the risk
    1524             :          * of any publication option getting changed.
    1525             :          */
    1526         914 :         tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    1527         914 :         if (!HeapTupleIsValid(tup))
    1528           0 :             ereport(ERROR,
    1529             :                     errcode(ERRCODE_UNDEFINED_OBJECT),
    1530             :                     errmsg("publication \"%s\" does not exist",
    1531             :                            stmt->pubname));
    1532             : 
    1533         914 :         AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
    1534             :                                schemaidlist != NIL);
    1535         782 :         AlterPublicationSchemas(stmt, tup, schemaidlist);
    1536             :     }
    1537             : 
    1538             :     /* Cleanup. */
    1539         868 :     heap_freetuple(tup);
    1540         868 :     table_close(rel, RowExclusiveLock);
    1541         868 : }
    1542             : 
    1543             : /*
    1544             :  * Remove relation from publication by mapping OID.
    1545             :  */
    1546             : void
    1547         844 : RemovePublicationRelById(Oid proid)
    1548             : {
    1549             :     Relation    rel;
    1550             :     HeapTuple   tup;
    1551             :     Form_pg_publication_rel pubrel;
    1552         844 :     List       *relids = NIL;
    1553             : 
    1554         844 :     rel = table_open(PublicationRelRelationId, RowExclusiveLock);
    1555             : 
    1556         844 :     tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
    1557             : 
    1558         844 :     if (!HeapTupleIsValid(tup))
    1559           0 :         elog(ERROR, "cache lookup failed for publication table %u",
    1560             :              proid);
    1561             : 
    1562         844 :     pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
    1563             : 
    1564             :     /*
    1565             :      * Invalidate relcache so that publication info is rebuilt.
    1566             :      *
    1567             :      * For the partitioned tables, we must invalidate all partitions contained
    1568             :      * in the respective partition hierarchies, not just the one explicitly
    1569             :      * mentioned in the publication. This is required because we implicitly
    1570             :      * publish the child tables when the parent table is published.
    1571             :      */
    1572         844 :     relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
    1573             :                                             pubrel->prrelid);
    1574             : 
    1575         844 :     InvalidatePublicationRels(relids);
    1576             : 
    1577         844 :     CatalogTupleDelete(rel, &tup->t_self);
    1578             : 
    1579         844 :     ReleaseSysCache(tup);
    1580             : 
    1581         844 :     table_close(rel, RowExclusiveLock);
    1582         844 : }
    1583             : 
    1584             : /*
    1585             :  * Remove the publication by mapping OID.
    1586             :  */
    1587             : void
    1588         470 : RemovePublicationById(Oid pubid)
    1589             : {
    1590             :     Relation    rel;
    1591             :     HeapTuple   tup;
    1592             :     Form_pg_publication pubform;
    1593             : 
    1594         470 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    1595             : 
    1596         470 :     tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    1597         470 :     if (!HeapTupleIsValid(tup))
    1598           0 :         elog(ERROR, "cache lookup failed for publication %u", pubid);
    1599             : 
    1600         470 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1601             : 
    1602             :     /* Invalidate relcache so that publication info is rebuilt. */
    1603         470 :     if (pubform->puballtables)
    1604          54 :         CacheInvalidateRelcacheAll();
    1605             : 
    1606         470 :     CatalogTupleDelete(rel, &tup->t_self);
    1607             : 
    1608         470 :     ReleaseSysCache(tup);
    1609             : 
    1610         470 :     table_close(rel, RowExclusiveLock);
    1611         470 : }
    1612             : 
    1613             : /*
    1614             :  * Remove schema from publication by mapping OID.
    1615             :  */
    1616             : void
    1617         192 : RemovePublicationSchemaById(Oid psoid)
    1618             : {
    1619             :     Relation    rel;
    1620             :     HeapTuple   tup;
    1621         192 :     List       *schemaRels = NIL;
    1622             :     Form_pg_publication_namespace pubsch;
    1623             : 
    1624         192 :     rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
    1625             : 
    1626         192 :     tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
    1627             : 
    1628         192 :     if (!HeapTupleIsValid(tup))
    1629           0 :         elog(ERROR, "cache lookup failed for publication schema %u", psoid);
    1630             : 
    1631         192 :     pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
    1632             : 
    1633             :     /*
    1634             :      * Invalidate relcache so that publication info is rebuilt. See
    1635             :      * RemovePublicationRelById for why we need to consider all the
    1636             :      * partitions.
    1637             :      */
    1638         192 :     schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
    1639             :                                                PUBLICATION_PART_ALL);
    1640         192 :     InvalidatePublicationRels(schemaRels);
    1641             : 
    1642         192 :     CatalogTupleDelete(rel, &tup->t_self);
    1643             : 
    1644         192 :     ReleaseSysCache(tup);
    1645             : 
    1646         192 :     table_close(rel, RowExclusiveLock);
    1647         192 : }
    1648             : 
    1649             : /*
    1650             :  * Open relations specified by a PublicationTable list.
    1651             :  * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
    1652             :  * add them to a publication.
    1653             :  */
    1654             : static List *
    1655        1310 : OpenTableList(List *tables)
    1656             : {
    1657        1310 :     List       *relids = NIL;
    1658        1310 :     List       *rels = NIL;
    1659             :     ListCell   *lc;
    1660        1310 :     List       *relids_with_rf = NIL;
    1661        1310 :     List       *relids_with_collist = NIL;
    1662             : 
    1663             :     /*
    1664             :      * Open, share-lock, and check all the explicitly-specified relations
    1665             :      */
    1666        2686 :     foreach(lc, tables)
    1667             :     {
    1668        1406 :         PublicationTable *t = lfirst_node(PublicationTable, lc);
    1669        1406 :         bool        recurse = t->relation->inh;
    1670             :         Relation    rel;
    1671             :         Oid         myrelid;
    1672             :         PublicationRelInfo *pub_rel;
    1673             : 
    1674             :         /* Allow query cancel in case this takes a long time */
    1675        1406 :         CHECK_FOR_INTERRUPTS();
    1676             : 
    1677        1406 :         rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
    1678        1400 :         myrelid = RelationGetRelid(rel);
    1679             : 
    1680             :         /*
    1681             :          * Filter out duplicates if user specifies "foo, foo".
    1682             :          *
    1683             :          * Note that this algorithm is known to not be very efficient (O(N^2))
    1684             :          * but given that it only works on list of tables given to us by user
    1685             :          * it's deemed acceptable.
    1686             :          */
    1687        1400 :         if (list_member_oid(relids, myrelid))
    1688             :         {
    1689             :             /* Disallow duplicate tables if there are any with row filters. */
    1690          24 :             if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
    1691          12 :                 ereport(ERROR,
    1692             :                         (errcode(ERRCODE_DUPLICATE_OBJECT),
    1693             :                          errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
    1694             :                                 RelationGetRelationName(rel))));
    1695             : 
    1696             :             /* Disallow duplicate tables if there are any with column lists. */
    1697          12 :             if (t->columns || list_member_oid(relids_with_collist, myrelid))
    1698          12 :                 ereport(ERROR,
    1699             :                         (errcode(ERRCODE_DUPLICATE_OBJECT),
    1700             :                          errmsg("conflicting or redundant column lists for table \"%s\"",
    1701             :                                 RelationGetRelationName(rel))));
    1702             : 
    1703           0 :             table_close(rel, ShareUpdateExclusiveLock);
    1704           0 :             continue;
    1705             :         }
    1706             : 
    1707        1376 :         pub_rel = palloc(sizeof(PublicationRelInfo));
    1708        1376 :         pub_rel->relation = rel;
    1709        1376 :         pub_rel->whereClause = t->whereClause;
    1710        1376 :         pub_rel->columns = t->columns;
    1711        1376 :         rels = lappend(rels, pub_rel);
    1712        1376 :         relids = lappend_oid(relids, myrelid);
    1713             : 
    1714        1376 :         if (t->whereClause)
    1715         404 :             relids_with_rf = lappend_oid(relids_with_rf, myrelid);
    1716             : 
    1717        1376 :         if (t->columns)
    1718         410 :             relids_with_collist = lappend_oid(relids_with_collist, myrelid);
    1719             : 
    1720             :         /*
    1721             :          * Add children of this rel, if requested, so that they too are added
    1722             :          * to the publication.  A partitioned table can't have any inheritance
    1723             :          * children other than its partitions, which need not be explicitly
    1724             :          * added to the publication.
    1725             :          */
    1726        1376 :         if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
    1727             :         {
    1728             :             List       *children;
    1729             :             ListCell   *child;
    1730             : 
    1731        1152 :             children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
    1732             :                                            NULL);
    1733             : 
    1734        2312 :             foreach(child, children)
    1735             :             {
    1736        1160 :                 Oid         childrelid = lfirst_oid(child);
    1737             : 
    1738             :                 /* Allow query cancel in case this takes a long time */
    1739        1160 :                 CHECK_FOR_INTERRUPTS();
    1740             : 
    1741             :                 /*
    1742             :                  * Skip duplicates if user specified both parent and child
    1743             :                  * tables.
    1744             :                  */
    1745        1160 :                 if (list_member_oid(relids, childrelid))
    1746             :                 {
    1747             :                     /*
    1748             :                      * We don't allow to specify row filter for both parent
    1749             :                      * and child table at the same time as it is not very
    1750             :                      * clear which one should be given preference.
    1751             :                      */
    1752        1152 :                     if (childrelid != myrelid &&
    1753           0 :                         (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
    1754           0 :                         ereport(ERROR,
    1755             :                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    1756             :                                  errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
    1757             :                                         RelationGetRelationName(rel))));
    1758             : 
    1759             :                     /*
    1760             :                      * We don't allow to specify column list for both parent
    1761             :                      * and child table at the same time as it is not very
    1762             :                      * clear which one should be given preference.
    1763             :                      */
    1764        1152 :                     if (childrelid != myrelid &&
    1765           0 :                         (t->columns || list_member_oid(relids_with_collist, childrelid)))
    1766           0 :                         ereport(ERROR,
    1767             :                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    1768             :                                  errmsg("conflicting or redundant column lists for table \"%s\"",
    1769             :                                         RelationGetRelationName(rel))));
    1770             : 
    1771        1152 :                     continue;
    1772             :                 }
    1773             : 
    1774             :                 /* find_all_inheritors already got lock */
    1775           8 :                 rel = table_open(childrelid, NoLock);
    1776           8 :                 pub_rel = palloc(sizeof(PublicationRelInfo));
    1777           8 :                 pub_rel->relation = rel;
    1778             :                 /* child inherits WHERE clause from parent */
    1779           8 :                 pub_rel->whereClause = t->whereClause;
    1780             : 
    1781             :                 /* child inherits column list from parent */
    1782           8 :                 pub_rel->columns = t->columns;
    1783           8 :                 rels = lappend(rels, pub_rel);
    1784           8 :                 relids = lappend_oid(relids, childrelid);
    1785             : 
    1786           8 :                 if (t->whereClause)
    1787           2 :                     relids_with_rf = lappend_oid(relids_with_rf, childrelid);
    1788             : 
    1789           8 :                 if (t->columns)
    1790           0 :                     relids_with_collist = lappend_oid(relids_with_collist, childrelid);
    1791             :             }
    1792             :         }
    1793             :     }
    1794             : 
    1795        1280 :     list_free(relids);
    1796        1280 :     list_free(relids_with_rf);
    1797             : 
    1798        1280 :     return rels;
    1799             : }
    1800             : 
    1801             : /*
    1802             :  * Close all relations in the list.
    1803             :  */
    1804             : static void
    1805        1496 : CloseTableList(List *rels)
    1806             : {
    1807             :     ListCell   *lc;
    1808             : 
    1809        3042 :     foreach(lc, rels)
    1810             :     {
    1811             :         PublicationRelInfo *pub_rel;
    1812             : 
    1813        1546 :         pub_rel = (PublicationRelInfo *) lfirst(lc);
    1814        1546 :         table_close(pub_rel->relation, NoLock);
    1815             :     }
    1816             : 
    1817        1496 :     list_free_deep(rels);
    1818        1496 : }
    1819             : 
    1820             : /*
    1821             :  * Lock the schemas specified in the schema list in AccessShareLock mode in
    1822             :  * order to prevent concurrent schema deletion.
    1823             :  */
    1824             : static void
    1825        1036 : LockSchemaList(List *schemalist)
    1826             : {
    1827             :     ListCell   *lc;
    1828             : 
    1829        1360 :     foreach(lc, schemalist)
    1830             :     {
    1831         324 :         Oid         schemaid = lfirst_oid(lc);
    1832             : 
    1833             :         /* Allow query cancel in case this takes a long time */
    1834         324 :         CHECK_FOR_INTERRUPTS();
    1835         324 :         LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
    1836             : 
    1837             :         /*
    1838             :          * It is possible that by the time we acquire the lock on schema,
    1839             :          * concurrent DDL has removed it. We can test this by checking the
    1840             :          * existence of schema.
    1841             :          */
    1842         324 :         if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
    1843           0 :             ereport(ERROR,
    1844             :                     errcode(ERRCODE_UNDEFINED_SCHEMA),
    1845             :                     errmsg("schema with OID %u does not exist", schemaid));
    1846             :     }
    1847        1036 : }
    1848             : 
    1849             : /*
    1850             :  * Add listed tables to the publication.
    1851             :  */
    1852             : static void
    1853        1080 : PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
    1854             :                      AlterPublicationStmt *stmt)
    1855             : {
    1856             :     ListCell   *lc;
    1857             : 
    1858             :     Assert(!stmt || !stmt->for_all_tables);
    1859             : 
    1860        2166 :     foreach(lc, rels)
    1861             :     {
    1862        1154 :         PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
    1863        1154 :         Relation    rel = pub_rel->relation;
    1864             :         ObjectAddress obj;
    1865             : 
    1866             :         /* Must be owner of the table or superuser. */
    1867        1154 :         if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
    1868           6 :             aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
    1869           6 :                            RelationGetRelationName(rel));
    1870             : 
    1871        1148 :         obj = publication_add_relation(pubid, pub_rel, if_not_exists);
    1872        1086 :         if (stmt)
    1873             :         {
    1874         626 :             EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
    1875             :                                              (Node *) stmt);
    1876             : 
    1877         626 :             InvokeObjectPostCreateHook(PublicationRelRelationId,
    1878             :                                        obj.objectId, 0);
    1879             :         }
    1880             :     }
    1881        1012 : }
    1882             : 
    1883             : /*
    1884             :  * Remove listed tables from the publication.
    1885             :  */
    1886             : static void
    1887         502 : PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
    1888             : {
    1889             :     ObjectAddress obj;
    1890             :     ListCell   *lc;
    1891             :     Oid         prid;
    1892             : 
    1893         962 :     foreach(lc, rels)
    1894             :     {
    1895         478 :         PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
    1896         478 :         Relation    rel = pubrel->relation;
    1897         478 :         Oid         relid = RelationGetRelid(rel);
    1898             : 
    1899         478 :         if (pubrel->columns)
    1900           0 :             ereport(ERROR,
    1901             :                     errcode(ERRCODE_SYNTAX_ERROR),
    1902             :                     errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
    1903             : 
    1904         478 :         prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
    1905             :                                ObjectIdGetDatum(relid),
    1906             :                                ObjectIdGetDatum(pubid));
    1907         478 :         if (!OidIsValid(prid))
    1908             :         {
    1909          12 :             if (missing_ok)
    1910           0 :                 continue;
    1911             : 
    1912          12 :             ereport(ERROR,
    1913             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    1914             :                      errmsg("relation \"%s\" is not part of the publication",
    1915             :                             RelationGetRelationName(rel))));
    1916             :         }
    1917             : 
    1918         466 :         if (pubrel->whereClause)
    1919           6 :             ereport(ERROR,
    1920             :                     (errcode(ERRCODE_SYNTAX_ERROR),
    1921             :                      errmsg("cannot use a WHERE clause when removing a table from a publication")));
    1922             : 
    1923         460 :         ObjectAddressSet(obj, PublicationRelRelationId, prid);
    1924         460 :         performDeletion(&obj, DROP_CASCADE, 0);
    1925             :     }
    1926         484 : }
    1927             : 
    1928             : /*
    1929             :  * Add listed schemas to the publication.
    1930             :  */
    1931             : static void
    1932         588 : PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
    1933             :                       AlterPublicationStmt *stmt)
    1934             : {
    1935             :     ListCell   *lc;
    1936             : 
    1937             :     Assert(!stmt || !stmt->for_all_tables);
    1938             : 
    1939         838 :     foreach(lc, schemas)
    1940             :     {
    1941         262 :         Oid         schemaid = lfirst_oid(lc);
    1942             :         ObjectAddress obj;
    1943             : 
    1944         262 :         obj = publication_add_schema(pubid, schemaid, if_not_exists);
    1945         250 :         if (stmt)
    1946             :         {
    1947          68 :             EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
    1948             :                                              (Node *) stmt);
    1949             : 
    1950          68 :             InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
    1951             :                                        obj.objectId, 0);
    1952             :         }
    1953             :     }
    1954         576 : }
    1955             : 
    1956             : /*
    1957             :  * Remove listed schemas from the publication.
    1958             :  */
    1959             : static void
    1960         442 : PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
    1961             : {
    1962             :     ObjectAddress obj;
    1963             :     ListCell   *lc;
    1964             :     Oid         psid;
    1965             : 
    1966         492 :     foreach(lc, schemas)
    1967             :     {
    1968          56 :         Oid         schemaid = lfirst_oid(lc);
    1969             : 
    1970          56 :         psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
    1971             :                                Anum_pg_publication_namespace_oid,
    1972             :                                ObjectIdGetDatum(schemaid),
    1973             :                                ObjectIdGetDatum(pubid));
    1974          56 :         if (!OidIsValid(psid))
    1975             :         {
    1976           6 :             if (missing_ok)
    1977           0 :                 continue;
    1978             : 
    1979           6 :             ereport(ERROR,
    1980             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    1981             :                      errmsg("tables from schema \"%s\" are not part of the publication",
    1982             :                             get_namespace_name(schemaid))));
    1983             :         }
    1984             : 
    1985          50 :         ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
    1986          50 :         performDeletion(&obj, DROP_CASCADE, 0);
    1987             :     }
    1988         436 : }
    1989             : 
    1990             : /*
    1991             :  * Internal workhorse for changing a publication owner
    1992             :  */
    1993             : static void
    1994          36 : AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
    1995             : {
    1996             :     Form_pg_publication form;
    1997             : 
    1998          36 :     form = (Form_pg_publication) GETSTRUCT(tup);
    1999             : 
    2000          36 :     if (form->pubowner == newOwnerId)
    2001          12 :         return;
    2002             : 
    2003          24 :     if (!superuser())
    2004             :     {
    2005             :         AclResult   aclresult;
    2006             : 
    2007             :         /* Must be owner */
    2008          12 :         if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
    2009           0 :             aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
    2010           0 :                            NameStr(form->pubname));
    2011             : 
    2012             :         /* Must be able to become new owner */
    2013          12 :         check_can_set_role(GetUserId(), newOwnerId);
    2014             : 
    2015             :         /* New owner must have CREATE privilege on database */
    2016          12 :         aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
    2017          12 :         if (aclresult != ACLCHECK_OK)
    2018           0 :             aclcheck_error(aclresult, OBJECT_DATABASE,
    2019           0 :                            get_database_name(MyDatabaseId));
    2020             : 
    2021          12 :         if (form->puballtables && !superuser_arg(newOwnerId))
    2022           0 :             ereport(ERROR,
    2023             :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    2024             :                      errmsg("permission denied to change owner of publication \"%s\"",
    2025             :                             NameStr(form->pubname)),
    2026             :                      errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
    2027             : 
    2028          12 :         if (!superuser_arg(newOwnerId) && is_schema_publication(form->oid))
    2029           6 :             ereport(ERROR,
    2030             :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    2031             :                      errmsg("permission denied to change owner of publication \"%s\"",
    2032             :                             NameStr(form->pubname)),
    2033             :                      errhint("The owner of a FOR TABLES IN SCHEMA publication must be a superuser.")));
    2034             :     }
    2035             : 
    2036          18 :     form->pubowner = newOwnerId;
    2037          18 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    2038             : 
    2039             :     /* Update owner dependency reference */
    2040          18 :     changeDependencyOnOwner(PublicationRelationId,
    2041             :                             form->oid,
    2042             :                             newOwnerId);
    2043             : 
    2044          18 :     InvokeObjectPostAlterHook(PublicationRelationId,
    2045             :                               form->oid, 0);
    2046             : }
    2047             : 
    2048             : /*
    2049             :  * Change publication owner -- by name
    2050             :  */
    2051             : ObjectAddress
    2052          36 : AlterPublicationOwner(const char *name, Oid newOwnerId)
    2053             : {
    2054             :     Oid         pubid;
    2055             :     HeapTuple   tup;
    2056             :     Relation    rel;
    2057             :     ObjectAddress address;
    2058             :     Form_pg_publication pubform;
    2059             : 
    2060          36 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    2061             : 
    2062          36 :     tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
    2063             : 
    2064          36 :     if (!HeapTupleIsValid(tup))
    2065           0 :         ereport(ERROR,
    2066             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2067             :                  errmsg("publication \"%s\" does not exist", name)));
    2068             : 
    2069          36 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    2070          36 :     pubid = pubform->oid;
    2071             : 
    2072          36 :     AlterPublicationOwner_internal(rel, tup, newOwnerId);
    2073             : 
    2074          30 :     ObjectAddressSet(address, PublicationRelationId, pubid);
    2075             : 
    2076          30 :     heap_freetuple(tup);
    2077             : 
    2078          30 :     table_close(rel, RowExclusiveLock);
    2079             : 
    2080          30 :     return address;
    2081             : }
    2082             : 
    2083             : /*
    2084             :  * Change publication owner -- by OID
    2085             :  */
    2086             : void
    2087           0 : AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId)
    2088             : {
    2089             :     HeapTuple   tup;
    2090             :     Relation    rel;
    2091             : 
    2092           0 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    2093             : 
    2094           0 :     tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    2095             : 
    2096           0 :     if (!HeapTupleIsValid(tup))
    2097           0 :         ereport(ERROR,
    2098             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2099             :                  errmsg("publication with OID %u does not exist", pubid)));
    2100             : 
    2101           0 :     AlterPublicationOwner_internal(rel, tup, newOwnerId);
    2102             : 
    2103           0 :     heap_freetuple(tup);
    2104             : 
    2105           0 :     table_close(rel, RowExclusiveLock);
    2106           0 : }
    2107             : 
    2108             : /*
    2109             :  * Extract the publish_generated_columns option value from a DefElem. "stored"
    2110             :  * and "none" values are accepted.
    2111             :  */
    2112             : static char
    2113          70 : defGetGeneratedColsOption(DefElem *def)
    2114             : {
    2115          70 :     char       *sval = "";
    2116             : 
    2117             :     /*
    2118             :      * A parameter value is required.
    2119             :      */
    2120          70 :     if (def->arg)
    2121             :     {
    2122          64 :         sval = defGetString(def);
    2123             : 
    2124          64 :         if (pg_strcasecmp(sval, "none") == 0)
    2125          22 :             return PUBLISH_GENCOLS_NONE;
    2126          42 :         if (pg_strcasecmp(sval, "stored") == 0)
    2127          36 :             return PUBLISH_GENCOLS_STORED;
    2128             :     }
    2129             : 
    2130          12 :     ereport(ERROR,
    2131             :             errcode(ERRCODE_SYNTAX_ERROR),
    2132             :             errmsg("invalid value for publication parameter \"%s\": \"%s\"", def->defname, sval),
    2133             :             errdetail("Valid values are \"%s\" and \"%s\".", "none", "stored"));
    2134             : 
    2135             :     return PUBLISH_GENCOLS_NONE;    /* keep compiler quiet */
    2136             : }

Generated by: LCOV version 1.16