LCOV - code coverage report
Current view: top level - src/backend/commands - publicationcmds.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 656 710 92.4 %
Date: 2026-02-07 01:18:26 Functions: 31 32 96.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * publicationcmds.c
       4             :  *      publication manipulation
       5             :  *
       6             :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *      src/backend/commands/publicationcmds.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/htup_details.h"
      18             : #include "access/table.h"
      19             : #include "access/xact.h"
      20             : #include "catalog/catalog.h"
      21             : #include "catalog/indexing.h"
      22             : #include "catalog/namespace.h"
      23             : #include "catalog/objectaccess.h"
      24             : #include "catalog/objectaddress.h"
      25             : #include "catalog/pg_database.h"
      26             : #include "catalog/pg_inherits.h"
      27             : #include "catalog/pg_namespace.h"
      28             : #include "catalog/pg_proc.h"
      29             : #include "catalog/pg_publication.h"
      30             : #include "catalog/pg_publication_namespace.h"
      31             : #include "catalog/pg_publication_rel.h"
      32             : #include "commands/defrem.h"
      33             : #include "commands/event_trigger.h"
      34             : #include "commands/publicationcmds.h"
      35             : #include "miscadmin.h"
      36             : #include "nodes/nodeFuncs.h"
      37             : #include "parser/parse_clause.h"
      38             : #include "parser/parse_collate.h"
      39             : #include "parser/parse_relation.h"
      40             : #include "rewrite/rewriteHandler.h"
      41             : #include "storage/lmgr.h"
      42             : #include "utils/acl.h"
      43             : #include "utils/builtins.h"
      44             : #include "utils/inval.h"
      45             : #include "utils/lsyscache.h"
      46             : #include "utils/rel.h"
      47             : #include "utils/syscache.h"
      48             : #include "utils/varlena.h"
      49             : 
      50             : 
      51             : /*
      52             :  * Information used to validate the columns in the row filter expression. See
      53             :  * contain_invalid_rfcolumn_walker for details.
      54             :  */
      55             : typedef struct rf_context
      56             : {
      57             :     Bitmapset  *bms_replident;  /* bitset of replica identity columns */
      58             :     bool        pubviaroot;     /* true if we are validating the parent
      59             :                                  * relation's row filter */
      60             :     Oid         relid;          /* relid of the relation */
      61             :     Oid         parentid;       /* relid of the parent relation */
      62             : } rf_context;
      63             : 
      64             : static List *OpenTableList(List *tables);
      65             : static void CloseTableList(List *rels);
      66             : static void LockSchemaList(List *schemalist);
      67             : static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
      68             :                                  AlterPublicationStmt *stmt);
      69             : static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
      70             : static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
      71             :                                   AlterPublicationStmt *stmt);
      72             : static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
      73             : static char defGetGeneratedColsOption(DefElem *def);
      74             : 
      75             : 
      76             : static void
      77        1012 : parse_publication_options(ParseState *pstate,
      78             :                           List *options,
      79             :                           bool *publish_given,
      80             :                           PublicationActions *pubactions,
      81             :                           bool *publish_via_partition_root_given,
      82             :                           bool *publish_via_partition_root,
      83             :                           bool *publish_generated_columns_given,
      84             :                           char *publish_generated_columns)
      85             : {
      86             :     ListCell   *lc;
      87             : 
      88        1012 :     *publish_given = false;
      89        1012 :     *publish_via_partition_root_given = false;
      90        1012 :     *publish_generated_columns_given = false;
      91             : 
      92             :     /* defaults */
      93        1012 :     pubactions->pubinsert = true;
      94        1012 :     pubactions->pubupdate = true;
      95        1012 :     pubactions->pubdelete = true;
      96        1012 :     pubactions->pubtruncate = true;
      97        1012 :     *publish_via_partition_root = false;
      98        1012 :     *publish_generated_columns = PUBLISH_GENCOLS_NONE;
      99             : 
     100             :     /* Parse options */
     101        1376 :     foreach(lc, options)
     102             :     {
     103         400 :         DefElem    *defel = (DefElem *) lfirst(lc);
     104             : 
     105         400 :         if (strcmp(defel->defname, "publish") == 0)
     106             :         {
     107             :             char       *publish;
     108             :             List       *publish_list;
     109             :             ListCell   *lc2;
     110             : 
     111         140 :             if (*publish_given)
     112           0 :                 errorConflictingDefElem(defel, pstate);
     113             : 
     114             :             /*
     115             :              * If publish option was given only the explicitly listed actions
     116             :              * should be published.
     117             :              */
     118         140 :             pubactions->pubinsert = false;
     119         140 :             pubactions->pubupdate = false;
     120         140 :             pubactions->pubdelete = false;
     121         140 :             pubactions->pubtruncate = false;
     122             : 
     123         140 :             *publish_given = true;
     124             : 
     125             :             /*
     126             :              * SplitIdentifierString destructively modifies its input, so make
     127             :              * a copy so we don't modify the memory of the executing statement
     128             :              */
     129         140 :             publish = pstrdup(defGetString(defel));
     130             : 
     131         140 :             if (!SplitIdentifierString(publish, ',', &publish_list))
     132           0 :                 ereport(ERROR,
     133             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     134             :                          errmsg("invalid list syntax in parameter \"%s\"",
     135             :                                 "publish")));
     136             : 
     137             :             /* Process the option list. */
     138         334 :             foreach(lc2, publish_list)
     139             :             {
     140         200 :                 char       *publish_opt = (char *) lfirst(lc2);
     141             : 
     142         200 :                 if (strcmp(publish_opt, "insert") == 0)
     143         124 :                     pubactions->pubinsert = true;
     144          76 :                 else if (strcmp(publish_opt, "update") == 0)
     145          30 :                     pubactions->pubupdate = true;
     146          46 :                 else if (strcmp(publish_opt, "delete") == 0)
     147          20 :                     pubactions->pubdelete = true;
     148          26 :                 else if (strcmp(publish_opt, "truncate") == 0)
     149          20 :                     pubactions->pubtruncate = true;
     150             :                 else
     151           6 :                     ereport(ERROR,
     152             :                             (errcode(ERRCODE_SYNTAX_ERROR),
     153             :                              errmsg("unrecognized value for publication option \"%s\": \"%s\"",
     154             :                                     "publish", publish_opt)));
     155             :             }
     156             :         }
     157         260 :         else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
     158             :         {
     159         172 :             if (*publish_via_partition_root_given)
     160           6 :                 errorConflictingDefElem(defel, pstate);
     161         166 :             *publish_via_partition_root_given = true;
     162         166 :             *publish_via_partition_root = defGetBoolean(defel);
     163             :         }
     164          88 :         else if (strcmp(defel->defname, "publish_generated_columns") == 0)
     165             :         {
     166          82 :             if (*publish_generated_columns_given)
     167           6 :                 errorConflictingDefElem(defel, pstate);
     168          76 :             *publish_generated_columns_given = true;
     169          76 :             *publish_generated_columns = defGetGeneratedColsOption(defel);
     170             :         }
     171             :         else
     172           6 :             ereport(ERROR,
     173             :                     (errcode(ERRCODE_SYNTAX_ERROR),
     174             :                      errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
     175             :     }
     176         976 : }
     177             : 
     178             : /*
     179             :  * Convert the PublicationObjSpecType list into schema oid list and
     180             :  * PublicationTable list.
     181             :  */
     182             : static void
     183        1700 : ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
     184             :                            List **rels, List **schemas)
     185             : {
     186             :     ListCell   *cell;
     187             :     PublicationObjSpec *pubobj;
     188             : 
     189        1700 :     if (!pubobjspec_list)
     190         104 :         return;
     191             : 
     192        3388 :     foreach(cell, pubobjspec_list)
     193             :     {
     194             :         Oid         schemaid;
     195             :         List       *search_path;
     196             : 
     197        1828 :         pubobj = (PublicationObjSpec *) lfirst(cell);
     198             : 
     199        1828 :         switch (pubobj->pubobjtype)
     200             :         {
     201        1432 :             case PUBLICATIONOBJ_TABLE:
     202        1432 :                 *rels = lappend(*rels, pubobj->pubtable);
     203        1432 :                 break;
     204         372 :             case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
     205         372 :                 schemaid = get_namespace_oid(pubobj->name, false);
     206             : 
     207             :                 /* Filter out duplicates if user specifies "sch1, sch1" */
     208         342 :                 *schemas = list_append_unique_oid(*schemas, schemaid);
     209         342 :                 break;
     210          24 :             case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
     211          24 :                 search_path = fetch_search_path(false);
     212          24 :                 if (search_path == NIL) /* nothing valid in search_path? */
     213           6 :                     ereport(ERROR,
     214             :                             errcode(ERRCODE_UNDEFINED_SCHEMA),
     215             :                             errmsg("no schema has been selected for CURRENT_SCHEMA"));
     216             : 
     217          18 :                 schemaid = linitial_oid(search_path);
     218          18 :                 list_free(search_path);
     219             : 
     220             :                 /* Filter out duplicates if user specifies "sch1, sch1" */
     221          18 :                 *schemas = list_append_unique_oid(*schemas, schemaid);
     222          18 :                 break;
     223           0 :             default:
     224             :                 /* shouldn't happen */
     225           0 :                 elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
     226             :                 break;
     227             :         }
     228             :     }
     229             : }
     230             : 
     231             : /*
     232             :  * Returns true if any of the columns used in the row filter WHERE expression is
     233             :  * not part of REPLICA IDENTITY, false otherwise.
     234             :  */
     235             : static bool
     236         258 : contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
     237             : {
     238         258 :     if (node == NULL)
     239           0 :         return false;
     240             : 
     241         258 :     if (IsA(node, Var))
     242             :     {
     243         104 :         Var        *var = (Var *) node;
     244         104 :         AttrNumber  attnum = var->varattno;
     245             : 
     246             :         /*
     247             :          * If pubviaroot is true, we are validating the row filter of the
     248             :          * parent table, but the bitmap contains the replica identity
     249             :          * information of the child table. So, get the column number of the
     250             :          * child table as parent and child column order could be different.
     251             :          */
     252         104 :         if (context->pubviaroot)
     253             :         {
     254          16 :             char       *colname = get_attname(context->parentid, attnum, false);
     255             : 
     256          16 :             attnum = get_attnum(context->relid, colname);
     257             :         }
     258             : 
     259         104 :         if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
     260         104 :                            context->bms_replident))
     261          60 :             return true;
     262             :     }
     263             : 
     264         198 :     return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
     265             :                                   context);
     266             : }
     267             : 
     268             : /*
     269             :  * Check if all columns referenced in the filter expression are part of the
     270             :  * REPLICA IDENTITY index or not.
     271             :  *
     272             :  * Returns true if any invalid column is found.
     273             :  */
     274             : bool
     275         726 : pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
     276             :                                bool pubviaroot)
     277             : {
     278             :     HeapTuple   rftuple;
     279         726 :     Oid         relid = RelationGetRelid(relation);
     280         726 :     Oid         publish_as_relid = RelationGetRelid(relation);
     281         726 :     bool        result = false;
     282             :     Datum       rfdatum;
     283             :     bool        rfisnull;
     284             : 
     285             :     /*
     286             :      * FULL means all columns are in the REPLICA IDENTITY, so all columns are
     287             :      * allowed in the row filter and we can skip the validation.
     288             :      */
     289         726 :     if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     290         198 :         return false;
     291             : 
     292             :     /*
     293             :      * For a partition, if pubviaroot is true, find the topmost ancestor that
     294             :      * is published via this publication as we need to use its row filter
     295             :      * expression to filter the partition's changes.
     296             :      *
     297             :      * Note that even though the row filter used is for an ancestor, the
     298             :      * REPLICA IDENTITY used will be for the actual child table.
     299             :      */
     300         528 :     if (pubviaroot && relation->rd_rel->relispartition)
     301             :     {
     302             :         publish_as_relid
     303         108 :             = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
     304             : 
     305         108 :         if (!OidIsValid(publish_as_relid))
     306           6 :             publish_as_relid = relid;
     307             :     }
     308             : 
     309         528 :     rftuple = SearchSysCache2(PUBLICATIONRELMAP,
     310             :                               ObjectIdGetDatum(publish_as_relid),
     311             :                               ObjectIdGetDatum(pubid));
     312             : 
     313         528 :     if (!HeapTupleIsValid(rftuple))
     314          56 :         return false;
     315             : 
     316         472 :     rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
     317             :                               Anum_pg_publication_rel_prqual,
     318             :                               &rfisnull);
     319             : 
     320         472 :     if (!rfisnull)
     321             :     {
     322         102 :         rf_context  context = {0};
     323             :         Node       *rfnode;
     324         102 :         Bitmapset  *bms = NULL;
     325             : 
     326         102 :         context.pubviaroot = pubviaroot;
     327         102 :         context.parentid = publish_as_relid;
     328         102 :         context.relid = relid;
     329             : 
     330             :         /* Remember columns that are part of the REPLICA IDENTITY */
     331         102 :         bms = RelationGetIndexAttrBitmap(relation,
     332             :                                          INDEX_ATTR_BITMAP_IDENTITY_KEY);
     333             : 
     334         102 :         context.bms_replident = bms;
     335         102 :         rfnode = stringToNode(TextDatumGetCString(rfdatum));
     336         102 :         result = contain_invalid_rfcolumn_walker(rfnode, &context);
     337             :     }
     338             : 
     339         472 :     ReleaseSysCache(rftuple);
     340             : 
     341         472 :     return result;
     342             : }
     343             : 
     344             : /*
     345             :  * Check for invalid columns in the publication table definition.
     346             :  *
     347             :  * This function evaluates two conditions:
     348             :  *
     349             :  * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
     350             :  *    by the column list. If any column is missing, *invalid_column_list is set
     351             :  *    to true.
     352             :  * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY
     353             :  *    are published, either by being explicitly named in the column list or, if
     354             :  *    no column list is specified, by setting the option
     355             :  *    publish_generated_columns to stored. If any unpublished
     356             :  *    generated column is found, *invalid_gen_col is set to true.
     357             :  *
     358             :  * Returns true if any of the above conditions are not met.
     359             :  */
     360             : bool
     361         942 : pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
     362             :                             bool pubviaroot, char pubgencols_type,
     363             :                             bool *invalid_column_list,
     364             :                             bool *invalid_gen_col)
     365             : {
     366         942 :     Oid         relid = RelationGetRelid(relation);
     367         942 :     Oid         publish_as_relid = RelationGetRelid(relation);
     368             :     Bitmapset  *idattrs;
     369         942 :     Bitmapset  *columns = NULL;
     370         942 :     TupleDesc   desc = RelationGetDescr(relation);
     371             :     Publication *pub;
     372             :     int         x;
     373             : 
     374         942 :     *invalid_column_list = false;
     375         942 :     *invalid_gen_col = false;
     376             : 
     377             :     /*
     378             :      * For a partition, if pubviaroot is true, find the topmost ancestor that
     379             :      * is published via this publication as we need to use its column list for
     380             :      * the changes.
     381             :      *
     382             :      * Note that even though the column list used is for an ancestor, the
     383             :      * REPLICA IDENTITY used will be for the actual child table.
     384             :      */
     385         942 :     if (pubviaroot && relation->rd_rel->relispartition)
     386             :     {
     387         160 :         publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
     388             : 
     389         160 :         if (!OidIsValid(publish_as_relid))
     390          36 :             publish_as_relid = relid;
     391             :     }
     392             : 
     393             :     /* Fetch the column list */
     394         942 :     pub = GetPublication(pubid);
     395         942 :     check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns);
     396             : 
     397         942 :     if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     398             :     {
     399             :         /* With REPLICA IDENTITY FULL, no column list is allowed. */
     400         212 :         *invalid_column_list = (columns != NULL);
     401             : 
     402             :         /*
     403             :          * As we don't allow a column list with REPLICA IDENTITY FULL, the
     404             :          * publish_generated_columns option must be set to stored if the table
     405             :          * has any stored generated columns.
     406             :          */
     407         212 :         if (pubgencols_type != PUBLISH_GENCOLS_STORED &&
     408         200 :             relation->rd_att->constr &&
     409          88 :             relation->rd_att->constr->has_generated_stored)
     410           6 :             *invalid_gen_col = true;
     411             : 
     412             :         /*
     413             :          * Virtual generated columns are currently not supported for logical
     414             :          * replication at all.
     415             :          */
     416         212 :         if (relation->rd_att->constr &&
     417         100 :             relation->rd_att->constr->has_generated_virtual)
     418          12 :             *invalid_gen_col = true;
     419             : 
     420         212 :         if (*invalid_gen_col && *invalid_column_list)
     421           0 :             return true;
     422             :     }
     423             : 
     424             :     /* Remember columns that are part of the REPLICA IDENTITY */
     425         942 :     idattrs = RelationGetIndexAttrBitmap(relation,
     426             :                                          INDEX_ATTR_BITMAP_IDENTITY_KEY);
     427             : 
     428             :     /*
     429             :      * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
     430             :      * (to handle system columns the usual way), while column list does not
     431             :      * use offset, so we can't do bms_is_subset(). Instead, we have to loop
     432             :      * over the idattrs and check all of them are in the list.
     433             :      */
     434         942 :     x = -1;
     435        1598 :     while ((x = bms_next_member(idattrs, x)) >= 0)
     436             :     {
     437         662 :         AttrNumber  attnum = (x + FirstLowInvalidHeapAttributeNumber);
     438         662 :         Form_pg_attribute att = TupleDescAttr(desc, attnum - 1);
     439             : 
     440         662 :         if (columns == NULL)
     441             :         {
     442             :             /*
     443             :              * The publish_generated_columns option must be set to stored if
     444             :              * the REPLICA IDENTITY contains any stored generated column.
     445             :              */
     446         452 :             if (att->attgenerated == ATTRIBUTE_GENERATED_STORED && pubgencols_type != PUBLISH_GENCOLS_STORED)
     447             :             {
     448           6 :                 *invalid_gen_col = true;
     449           6 :                 break;
     450             :             }
     451             : 
     452             :             /*
     453             :              * The equivalent setting for virtual generated columns does not
     454             :              * exist yet.
     455             :              */
     456         446 :             if (att->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
     457             :             {
     458           0 :                 *invalid_gen_col = true;
     459           0 :                 break;
     460             :             }
     461             : 
     462             :             /* Skip validating the column list since it is not defined */
     463         446 :             continue;
     464             :         }
     465             : 
     466             :         /*
     467             :          * If pubviaroot is true, we are validating the column list of the
     468             :          * parent table, but the bitmap contains the replica identity
     469             :          * information of the child table. The parent/child attnums may not
     470             :          * match, so translate them to the parent - get the attname from the
     471             :          * child, and look it up in the parent.
     472             :          */
     473         210 :         if (pubviaroot)
     474             :         {
     475             :             /* attribute name in the child table */
     476          80 :             char       *colname = get_attname(relid, attnum, false);
     477             : 
     478             :             /*
     479             :              * Determine the attnum for the attribute name in parent (we are
     480             :              * using the column list defined on the parent).
     481             :              */
     482          80 :             attnum = get_attnum(publish_as_relid, colname);
     483             :         }
     484             : 
     485             :         /* replica identity column, not covered by the column list */
     486         210 :         *invalid_column_list |= !bms_is_member(attnum, columns);
     487             : 
     488         210 :         if (*invalid_column_list && *invalid_gen_col)
     489           0 :             break;
     490             :     }
     491             : 
     492         942 :     bms_free(columns);
     493         942 :     bms_free(idattrs);
     494             : 
     495         942 :     return *invalid_column_list || *invalid_gen_col;
     496             : }
     497             : 
     498             : /*
     499             :  * Invalidate entries in the RelationSyncCache for relations included in the
     500             :  * specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA.
     501             :  *
     502             :  * If 'puballtables' is true, invalidate all cache entries.
     503             :  */
     504             : void
     505          36 : InvalidatePubRelSyncCache(Oid pubid, bool puballtables)
     506             : {
     507          36 :     if (puballtables)
     508             :     {
     509           6 :         CacheInvalidateRelSyncAll();
     510             :     }
     511             :     else
     512             :     {
     513          30 :         List       *relids = NIL;
     514          30 :         List       *schemarelids = NIL;
     515             : 
     516             :         /*
     517             :          * For partitioned tables, we must invalidate all partitions and
     518             :          * itself. WAL records for INSERT/UPDATE/DELETE specify leaf tables as
     519             :          * a target. However, WAL records for TRUNCATE specify both a root and
     520             :          * its leaves.
     521             :          */
     522          30 :         relids = GetPublicationRelations(pubid,
     523             :                                          PUBLICATION_PART_ALL);
     524          30 :         schemarelids = GetAllSchemaPublicationRelations(pubid,
     525             :                                                         PUBLICATION_PART_ALL);
     526             : 
     527          30 :         relids = list_concat_unique_oid(relids, schemarelids);
     528             : 
     529             :         /* Invalidate the relsyncache */
     530          66 :         foreach_oid(relid, relids)
     531           6 :             CacheInvalidateRelSync(relid);
     532             :     }
     533             : 
     534          36 :     return;
     535             : }
     536             : 
     537             : /* check_functions_in_node callback */
     538             : static bool
     539         436 : contain_mutable_or_user_functions_checker(Oid func_id, void *context)
     540             : {
     541         436 :     return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
     542             :             func_id >= FirstNormalObjectId);
     543             : }
     544             : 
     545             : /*
     546             :  * The row filter walker checks if the row filter expression is a "simple
     547             :  * expression".
     548             :  *
     549             :  * It allows only simple or compound expressions such as:
     550             :  * - (Var Op Const)
     551             :  * - (Var Op Var)
     552             :  * - (Var Op Const) AND/OR (Var Op Const)
     553             :  * - etc
     554             :  * (where Var is a column of the table this filter belongs to)
     555             :  *
     556             :  * The simple expression has the following restrictions:
     557             :  * - User-defined operators are not allowed;
     558             :  * - User-defined functions are not allowed;
     559             :  * - User-defined types are not allowed;
     560             :  * - User-defined collations are not allowed;
     561             :  * - Non-immutable built-in functions are not allowed;
     562             :  * - System columns are not allowed.
     563             :  *
     564             :  * NOTES
     565             :  *
     566             :  * We don't allow user-defined functions/operators/types/collations because
     567             :  * (a) if a user drops a user-defined object used in a row filter expression or
     568             :  * if there is any other error while using it, the logical decoding
     569             :  * infrastructure won't be able to recover from such an error even if the
     570             :  * object is recreated again because a historic snapshot is used to evaluate
     571             :  * the row filter;
     572             :  * (b) a user-defined function can be used to access tables that could have
     573             :  * unpleasant results because a historic snapshot is used. That's why only
     574             :  * immutable built-in functions are allowed in row filter expressions.
     575             :  *
     576             :  * We don't allow system columns because currently, we don't have that
     577             :  * information in the tuple passed to downstream. Also, as we don't replicate
     578             :  * those to subscribers, there doesn't seem to be a need for a filter on those
     579             :  * columns.
     580             :  *
     581             :  * We can allow other node types after more analysis and testing.
     582             :  */
     583             : static bool
     584        1440 : check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
     585             : {
     586        1440 :     char       *errdetail_msg = NULL;
     587             : 
     588        1440 :     if (node == NULL)
     589           6 :         return false;
     590             : 
     591        1434 :     switch (nodeTag(node))
     592             :     {
     593         388 :         case T_Var:
     594             :             /* System columns are not allowed. */
     595         388 :             if (((Var *) node)->varattno < InvalidAttrNumber)
     596           6 :                 errdetail_msg = _("System columns are not allowed.");
     597         388 :             break;
     598         392 :         case T_OpExpr:
     599             :         case T_DistinctExpr:
     600             :         case T_NullIfExpr:
     601             :             /* OK, except user-defined operators are not allowed. */
     602         392 :             if (((OpExpr *) node)->opno >= FirstNormalObjectId)
     603           6 :                 errdetail_msg = _("User-defined operators are not allowed.");
     604         392 :             break;
     605           6 :         case T_ScalarArrayOpExpr:
     606             :             /* OK, except user-defined operators are not allowed. */
     607           6 :             if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
     608           0 :                 errdetail_msg = _("User-defined operators are not allowed.");
     609             : 
     610             :             /*
     611             :              * We don't need to check the hashfuncid and negfuncid of
     612             :              * ScalarArrayOpExpr as those functions are only built for a
     613             :              * subquery.
     614             :              */
     615           6 :             break;
     616           6 :         case T_RowCompareExpr:
     617             :             {
     618             :                 ListCell   *opid;
     619             : 
     620             :                 /* OK, except user-defined operators are not allowed. */
     621          18 :                 foreach(opid, ((RowCompareExpr *) node)->opnos)
     622             :                 {
     623          12 :                     if (lfirst_oid(opid) >= FirstNormalObjectId)
     624             :                     {
     625           0 :                         errdetail_msg = _("User-defined operators are not allowed.");
     626           0 :                         break;
     627             :                     }
     628             :                 }
     629             :             }
     630           6 :             break;
     631         636 :         case T_Const:
     632             :         case T_FuncExpr:
     633             :         case T_BoolExpr:
     634             :         case T_RelabelType:
     635             :         case T_CollateExpr:
     636             :         case T_CaseExpr:
     637             :         case T_CaseTestExpr:
     638             :         case T_ArrayExpr:
     639             :         case T_RowExpr:
     640             :         case T_CoalesceExpr:
     641             :         case T_MinMaxExpr:
     642             :         case T_XmlExpr:
     643             :         case T_NullTest:
     644             :         case T_BooleanTest:
     645             :         case T_List:
     646             :             /* OK, supported */
     647         636 :             break;
     648           6 :         default:
     649           6 :             errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
     650           6 :             break;
     651             :     }
     652             : 
     653             :     /*
     654             :      * For all the supported nodes, if we haven't already found a problem,
     655             :      * check the types, functions, and collations used in it.  We check List
     656             :      * by walking through each element.
     657             :      */
     658        1434 :     if (!errdetail_msg && !IsA(node, List))
     659             :     {
     660        1362 :         if (exprType(node) >= FirstNormalObjectId)
     661           6 :             errdetail_msg = _("User-defined types are not allowed.");
     662        1356 :         else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
     663             :                                          pstate))
     664          12 :             errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
     665        2688 :         else if (exprCollation(node) >= FirstNormalObjectId ||
     666        1344 :                  exprInputCollation(node) >= FirstNormalObjectId)
     667           6 :             errdetail_msg = _("User-defined collations are not allowed.");
     668             :     }
     669             : 
     670             :     /*
     671             :      * If we found a problem in this node, throw error now. Otherwise keep
     672             :      * going.
     673             :      */
     674        1434 :     if (errdetail_msg)
     675          42 :         ereport(ERROR,
     676             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     677             :                  errmsg("invalid publication WHERE expression"),
     678             :                  errdetail_internal("%s", errdetail_msg),
     679             :                  parser_errposition(pstate, exprLocation(node))));
     680             : 
     681        1392 :     return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
     682             :                                   pstate);
     683             : }
     684             : 
     685             : /*
     686             :  * Check if the row filter expression is a "simple expression".
     687             :  *
     688             :  * See check_simple_rowfilter_expr_walker for details.
     689             :  */
     690             : static bool
     691         376 : check_simple_rowfilter_expr(Node *node, ParseState *pstate)
     692             : {
     693         376 :     return check_simple_rowfilter_expr_walker(node, pstate);
     694             : }
     695             : 
     696             : /*
     697             :  * Transform the publication WHERE expression for all the relations in the list,
     698             :  * ensuring it is coerced to boolean and necessary collation information is
     699             :  * added if required, and add a new nsitem/RTE for the associated relation to
     700             :  * the ParseState's namespace list.
     701             :  *
     702             :  * Also check the publication row filter expression and throw an error if
     703             :  * anything not permitted or unexpected is encountered.
     704             :  */
     705             : static void
     706        1186 : TransformPubWhereClauses(List *tables, const char *queryString,
     707             :                          bool pubviaroot)
     708             : {
     709             :     ListCell   *lc;
     710             : 
     711        2388 :     foreach(lc, tables)
     712             :     {
     713             :         ParseNamespaceItem *nsitem;
     714        1262 :         Node       *whereclause = NULL;
     715             :         ParseState *pstate;
     716        1262 :         PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
     717             : 
     718        1262 :         if (pri->whereClause == NULL)
     719         868 :             continue;
     720             : 
     721             :         /*
     722             :          * If the publication doesn't publish changes via the root partitioned
     723             :          * table, the partition's row filter will be used. So disallow using
     724             :          * WHERE clause on partitioned table in this case.
     725             :          */
     726         394 :         if (!pubviaroot &&
     727         364 :             pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     728           6 :             ereport(ERROR,
     729             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     730             :                      errmsg("cannot use publication WHERE clause for relation \"%s\"",
     731             :                             RelationGetRelationName(pri->relation)),
     732             :                      errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
     733             :                                "publish_via_partition_root")));
     734             : 
     735             :         /*
     736             :          * A fresh pstate is required so that we only have "this" table in its
     737             :          * rangetable
     738             :          */
     739         388 :         pstate = make_parsestate(NULL);
     740         388 :         pstate->p_sourcetext = queryString;
     741         388 :         nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
     742             :                                                AccessShareLock, NULL,
     743             :                                                false, false);
     744         388 :         addNSItemToQuery(pstate, nsitem, false, true, true);
     745             : 
     746         388 :         whereclause = transformWhereClause(pstate,
     747         388 :                                            copyObject(pri->whereClause),
     748             :                                            EXPR_KIND_WHERE,
     749             :                                            "PUBLICATION WHERE");
     750             : 
     751             :         /* Fix up collation information */
     752         376 :         assign_expr_collations(pstate, whereclause);
     753             : 
     754         376 :         whereclause = expand_generated_columns_in_expr(whereclause, pri->relation, 1);
     755             : 
     756             :         /*
     757             :          * We allow only simple expressions in row filters. See
     758             :          * check_simple_rowfilter_expr_walker.
     759             :          */
     760         376 :         check_simple_rowfilter_expr(whereclause, pstate);
     761             : 
     762         334 :         free_parsestate(pstate);
     763             : 
     764         334 :         pri->whereClause = whereclause;
     765             :     }
     766        1126 : }
     767             : 
     768             : 
     769             : /*
     770             :  * Given a list of tables that are going to be added to a publication,
     771             :  * verify that they fulfill the necessary preconditions, namely: no tables
     772             :  * have a column list if any schema is published; and partitioned tables do
     773             :  * not have column lists if publish_via_partition_root is not set.
     774             :  *
     775             :  * 'publish_schema' indicates that the publication contains any TABLES IN
     776             :  * SCHEMA elements (newly added in this command, or preexisting).
     777             :  * 'pubviaroot' is the value of publish_via_partition_root.
     778             :  */
     779             : static void
     780        1126 : CheckPubRelationColumnList(char *pubname, List *tables,
     781             :                            bool publish_schema, bool pubviaroot)
     782             : {
     783             :     ListCell   *lc;
     784             : 
     785        2298 :     foreach(lc, tables)
     786             :     {
     787        1202 :         PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
     788             : 
     789        1202 :         if (pri->columns == NIL)
     790         798 :             continue;
     791             : 
     792             :         /*
     793             :          * Disallow specifying column list if any schema is in the
     794             :          * publication.
     795             :          *
     796             :          * XXX We could instead just forbid the case when the publication
     797             :          * tries to publish the table with a column list and a schema for that
     798             :          * table. However, if we do that then we need a restriction during
     799             :          * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
     800             :          * seem to be a good idea.
     801             :          */
     802         404 :         if (publish_schema)
     803          24 :             ereport(ERROR,
     804             :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     805             :                     errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
     806             :                            get_namespace_name(RelationGetNamespace(pri->relation)),
     807             :                            RelationGetRelationName(pri->relation), pubname),
     808             :                     errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
     809             : 
     810             :         /*
     811             :          * If the publication doesn't publish changes via the root partitioned
     812             :          * table, the partition's column list will be used. So disallow using
     813             :          * a column list on the partitioned table in this case.
     814             :          */
     815         380 :         if (!pubviaroot &&
     816         304 :             pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     817           6 :             ereport(ERROR,
     818             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     819             :                      errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
     820             :                             get_namespace_name(RelationGetNamespace(pri->relation)),
     821             :                             RelationGetRelationName(pri->relation), pubname),
     822             :                      errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
     823             :                                "publish_via_partition_root")));
     824             :     }
     825        1096 : }
     826             : 
     827             : /*
     828             :  * Create new publication.
     829             :  */
     830             : ObjectAddress
     831         896 : CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
     832             : {
     833             :     Relation    rel;
     834             :     ObjectAddress myself;
     835             :     Oid         puboid;
     836             :     bool        nulls[Natts_pg_publication];
     837             :     Datum       values[Natts_pg_publication];
     838             :     HeapTuple   tup;
     839             :     bool        publish_given;
     840             :     PublicationActions pubactions;
     841             :     bool        publish_via_partition_root_given;
     842             :     bool        publish_via_partition_root;
     843             :     bool        publish_generated_columns_given;
     844             :     char        publish_generated_columns;
     845             :     AclResult   aclresult;
     846         896 :     List       *relations = NIL;
     847         896 :     List       *schemaidlist = NIL;
     848             : 
     849             :     /* must have CREATE privilege on database */
     850         896 :     aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
     851         896 :     if (aclresult != ACLCHECK_OK)
     852           6 :         aclcheck_error(aclresult, OBJECT_DATABASE,
     853           6 :                        get_database_name(MyDatabaseId));
     854             : 
     855             :     /* FOR ALL TABLES and FOR ALL SEQUENCES requires superuser */
     856         890 :     if (!superuser())
     857             :     {
     858          24 :         if (stmt->for_all_tables || stmt->for_all_sequences)
     859           0 :             ereport(ERROR,
     860             :                     errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     861             :                     errmsg("must be superuser to create a FOR ALL TABLES or ALL SEQUENCES publication"));
     862             :     }
     863             : 
     864         890 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
     865             : 
     866             :     /* Check if name is used */
     867         890 :     puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
     868             :                              CStringGetDatum(stmt->pubname));
     869         890 :     if (OidIsValid(puboid))
     870           6 :         ereport(ERROR,
     871             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     872             :                  errmsg("publication \"%s\" already exists",
     873             :                         stmt->pubname)));
     874             : 
     875             :     /* Form a tuple. */
     876         884 :     memset(values, 0, sizeof(values));
     877         884 :     memset(nulls, false, sizeof(nulls));
     878             : 
     879         884 :     values[Anum_pg_publication_pubname - 1] =
     880         884 :         DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
     881         884 :     values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
     882             : 
     883         884 :     parse_publication_options(pstate,
     884             :                               stmt->options,
     885             :                               &publish_given, &pubactions,
     886             :                               &publish_via_partition_root_given,
     887             :                               &publish_via_partition_root,
     888             :                               &publish_generated_columns_given,
     889             :                               &publish_generated_columns);
     890             : 
     891         848 :     if (stmt->for_all_sequences &&
     892          28 :         (publish_given || publish_via_partition_root_given ||
     893             :          publish_generated_columns_given))
     894           2 :         ereport(NOTICE,
     895             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     896             :                 errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
     897             : 
     898         848 :     puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
     899             :                                 Anum_pg_publication_oid);
     900         848 :     values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
     901         848 :     values[Anum_pg_publication_puballtables - 1] =
     902         848 :         BoolGetDatum(stmt->for_all_tables);
     903         848 :     values[Anum_pg_publication_puballsequences - 1] =
     904         848 :         BoolGetDatum(stmt->for_all_sequences);
     905         848 :     values[Anum_pg_publication_pubinsert - 1] =
     906         848 :         BoolGetDatum(pubactions.pubinsert);
     907         848 :     values[Anum_pg_publication_pubupdate - 1] =
     908         848 :         BoolGetDatum(pubactions.pubupdate);
     909         848 :     values[Anum_pg_publication_pubdelete - 1] =
     910         848 :         BoolGetDatum(pubactions.pubdelete);
     911         848 :     values[Anum_pg_publication_pubtruncate - 1] =
     912         848 :         BoolGetDatum(pubactions.pubtruncate);
     913         848 :     values[Anum_pg_publication_pubviaroot - 1] =
     914         848 :         BoolGetDatum(publish_via_partition_root);
     915         848 :     values[Anum_pg_publication_pubgencols - 1] =
     916         848 :         CharGetDatum(publish_generated_columns);
     917             : 
     918         848 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     919             : 
     920             :     /* Insert tuple into catalog. */
     921         848 :     CatalogTupleInsert(rel, tup);
     922         848 :     heap_freetuple(tup);
     923             : 
     924         848 :     recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
     925             : 
     926         848 :     ObjectAddressSet(myself, PublicationRelationId, puboid);
     927             : 
     928             :     /* Make the changes visible. */
     929         848 :     CommandCounterIncrement();
     930             : 
     931             :     /* Associate objects with the publication. */
     932         848 :     if (stmt->for_all_tables)
     933             :     {
     934             :         /*
     935             :          * Invalidate relcache so that publication info is rebuilt. Sequences
     936             :          * publication doesn't require invalidation, as replica identity
     937             :          * checks don't apply to them.
     938             :          */
     939         106 :         CacheInvalidateRelcacheAll();
     940             :     }
     941         742 :     else if (!stmt->for_all_sequences)
     942             :     {
     943         722 :         ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
     944             :                                    &schemaidlist);
     945             : 
     946             :         /* FOR TABLES IN SCHEMA requires superuser */
     947         704 :         if (schemaidlist != NIL && !superuser())
     948           6 :             ereport(ERROR,
     949             :                     errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     950             :                     errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
     951             : 
     952         698 :         if (relations != NIL)
     953             :         {
     954             :             List       *rels;
     955             : 
     956         474 :             rels = OpenTableList(relations);
     957         444 :             TransformPubWhereClauses(rels, pstate->p_sourcetext,
     958             :                                      publish_via_partition_root);
     959             : 
     960         420 :             CheckPubRelationColumnList(stmt->pubname, rels,
     961             :                                        schemaidlist != NIL,
     962             :                                        publish_via_partition_root);
     963             : 
     964         414 :             PublicationAddTables(puboid, rels, true, NULL);
     965         388 :             CloseTableList(rels);
     966             :         }
     967             : 
     968         612 :         if (schemaidlist != NIL)
     969             :         {
     970             :             /*
     971             :              * Schema lock is held until the publication is created to prevent
     972             :              * concurrent schema deletion.
     973             :              */
     974         152 :             LockSchemaList(schemaidlist);
     975         152 :             PublicationAddSchemas(puboid, schemaidlist, true, NULL);
     976             :         }
     977             :     }
     978             : 
     979         732 :     table_close(rel, RowExclusiveLock);
     980             : 
     981         732 :     InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
     982             : 
     983             :     /*
     984             :      * We don't need this warning message when wal_level >= 'replica' since
     985             :      * logical decoding is automatically enabled up on a logical slot
     986             :      * creation.
     987             :      */
     988         732 :     if (wal_level < WAL_LEVEL_REPLICA)
     989          26 :         ereport(WARNING,
     990             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     991             :                  errmsg("logical decoding must be enabled to publish logical changes"),
     992             :                  errhint("Before creating subscriptions, ensure that \"wal_level\" is set to \"replica\" or higher.")));
     993             : 
     994         732 :     return myself;
     995             : }
     996             : 
     997             : /*
     998             :  * Change options of a publication.
     999             :  */
    1000             : static void
    1001         128 : AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
    1002             :                         Relation rel, HeapTuple tup)
    1003             : {
    1004             :     bool        nulls[Natts_pg_publication];
    1005             :     bool        replaces[Natts_pg_publication];
    1006             :     Datum       values[Natts_pg_publication];
    1007             :     bool        publish_given;
    1008             :     PublicationActions pubactions;
    1009             :     bool        publish_via_partition_root_given;
    1010             :     bool        publish_via_partition_root;
    1011             :     bool        publish_generated_columns_given;
    1012             :     char        publish_generated_columns;
    1013             :     ObjectAddress obj;
    1014             :     Form_pg_publication pubform;
    1015         128 :     List       *root_relids = NIL;
    1016             :     ListCell   *lc;
    1017             : 
    1018         128 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1019             : 
    1020         128 :     parse_publication_options(pstate,
    1021             :                               stmt->options,
    1022             :                               &publish_given, &pubactions,
    1023             :                               &publish_via_partition_root_given,
    1024             :                               &publish_via_partition_root,
    1025             :                               &publish_generated_columns_given,
    1026             :                               &publish_generated_columns);
    1027             : 
    1028         128 :     if (pubform->puballsequences &&
    1029          12 :         (publish_given || publish_via_partition_root_given ||
    1030             :          publish_generated_columns_given))
    1031          12 :         ereport(NOTICE,
    1032             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1033             :                 errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
    1034             : 
    1035             :     /*
    1036             :      * If the publication doesn't publish changes via the root partitioned
    1037             :      * table, the partition's row filter and column list will be used. So
    1038             :      * disallow using WHERE clause and column lists on partitioned table in
    1039             :      * this case.
    1040             :      */
    1041         128 :     if (!pubform->puballtables && publish_via_partition_root_given &&
    1042          80 :         !publish_via_partition_root)
    1043             :     {
    1044             :         /*
    1045             :          * Lock the publication so nobody else can do anything with it. This
    1046             :          * prevents concurrent alter to add partitioned table(s) with WHERE
    1047             :          * clause(s) and/or column lists which we don't allow when not
    1048             :          * publishing via root.
    1049             :          */
    1050          48 :         LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
    1051             :                            AccessShareLock);
    1052             : 
    1053          48 :         root_relids = GetPublicationRelations(pubform->oid,
    1054             :                                               PUBLICATION_PART_ROOT);
    1055             : 
    1056          84 :         foreach(lc, root_relids)
    1057             :         {
    1058          48 :             Oid         relid = lfirst_oid(lc);
    1059             :             HeapTuple   rftuple;
    1060             :             char        relkind;
    1061             :             char       *relname;
    1062             :             bool        has_rowfilter;
    1063             :             bool        has_collist;
    1064             : 
    1065             :             /*
    1066             :              * Beware: we don't have lock on the relations, so cope silently
    1067             :              * with the cache lookups returning NULL.
    1068             :              */
    1069             : 
    1070          48 :             rftuple = SearchSysCache2(PUBLICATIONRELMAP,
    1071             :                                       ObjectIdGetDatum(relid),
    1072             :                                       ObjectIdGetDatum(pubform->oid));
    1073          48 :             if (!HeapTupleIsValid(rftuple))
    1074           0 :                 continue;
    1075          48 :             has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
    1076          48 :             has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
    1077          48 :             if (!has_rowfilter && !has_collist)
    1078             :             {
    1079          12 :                 ReleaseSysCache(rftuple);
    1080          12 :                 continue;
    1081             :             }
    1082             : 
    1083          36 :             relkind = get_rel_relkind(relid);
    1084          36 :             if (relkind != RELKIND_PARTITIONED_TABLE)
    1085             :             {
    1086          24 :                 ReleaseSysCache(rftuple);
    1087          24 :                 continue;
    1088             :             }
    1089          12 :             relname = get_rel_name(relid);
    1090          12 :             if (relname == NULL)    /* table concurrently dropped */
    1091             :             {
    1092           0 :                 ReleaseSysCache(rftuple);
    1093           0 :                 continue;
    1094             :             }
    1095             : 
    1096          12 :             if (has_rowfilter)
    1097           6 :                 ereport(ERROR,
    1098             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1099             :                          errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
    1100             :                                 "publish_via_partition_root",
    1101             :                                 stmt->pubname),
    1102             :                          errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
    1103             :                                    relname, "publish_via_partition_root")));
    1104             :             Assert(has_collist);
    1105           6 :             ereport(ERROR,
    1106             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1107             :                      errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
    1108             :                             "publish_via_partition_root",
    1109             :                             stmt->pubname),
    1110             :                      errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
    1111             :                                relname, "publish_via_partition_root")));
    1112             :         }
    1113             :     }
    1114             : 
    1115             :     /* Everything ok, form a new tuple. */
    1116         116 :     memset(values, 0, sizeof(values));
    1117         116 :     memset(nulls, false, sizeof(nulls));
    1118         116 :     memset(replaces, false, sizeof(replaces));
    1119             : 
    1120         116 :     if (publish_given)
    1121             :     {
    1122          34 :         values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
    1123          34 :         replaces[Anum_pg_publication_pubinsert - 1] = true;
    1124             : 
    1125          34 :         values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
    1126          34 :         replaces[Anum_pg_publication_pubupdate - 1] = true;
    1127             : 
    1128          34 :         values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
    1129          34 :         replaces[Anum_pg_publication_pubdelete - 1] = true;
    1130             : 
    1131          34 :         values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
    1132          34 :         replaces[Anum_pg_publication_pubtruncate - 1] = true;
    1133             :     }
    1134             : 
    1135         116 :     if (publish_via_partition_root_given)
    1136             :     {
    1137          70 :         values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
    1138          70 :         replaces[Anum_pg_publication_pubviaroot - 1] = true;
    1139             :     }
    1140             : 
    1141         116 :     if (publish_generated_columns_given)
    1142             :     {
    1143          12 :         values[Anum_pg_publication_pubgencols - 1] = CharGetDatum(publish_generated_columns);
    1144          12 :         replaces[Anum_pg_publication_pubgencols - 1] = true;
    1145             :     }
    1146             : 
    1147         116 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
    1148             :                             replaces);
    1149             : 
    1150             :     /* Update the catalog. */
    1151         116 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    1152             : 
    1153         116 :     CommandCounterIncrement();
    1154             : 
    1155         116 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1156             : 
    1157             :     /* Invalidate the relcache. */
    1158         116 :     if (pubform->puballtables)
    1159             :     {
    1160          20 :         CacheInvalidateRelcacheAll();
    1161             :     }
    1162             :     else
    1163             :     {
    1164          96 :         List       *relids = NIL;
    1165          96 :         List       *schemarelids = NIL;
    1166             : 
    1167             :         /*
    1168             :          * For any partitioned tables contained in the publication, we must
    1169             :          * invalidate all partitions contained in the respective partition
    1170             :          * trees, not just those explicitly mentioned in the publication.
    1171             :          */
    1172          96 :         if (root_relids == NIL)
    1173          60 :             relids = GetPublicationRelations(pubform->oid,
    1174             :                                              PUBLICATION_PART_ALL);
    1175             :         else
    1176             :         {
    1177             :             /*
    1178             :              * We already got tables explicitly mentioned in the publication.
    1179             :              * Now get all partitions for the partitioned table in the list.
    1180             :              */
    1181          72 :             foreach(lc, root_relids)
    1182          36 :                 relids = GetPubPartitionOptionRelations(relids,
    1183             :                                                         PUBLICATION_PART_ALL,
    1184             :                                                         lfirst_oid(lc));
    1185             :         }
    1186             : 
    1187          96 :         schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
    1188             :                                                         PUBLICATION_PART_ALL);
    1189          96 :         relids = list_concat_unique_oid(relids, schemarelids);
    1190             : 
    1191          96 :         InvalidatePublicationRels(relids);
    1192             :     }
    1193             : 
    1194         116 :     ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
    1195         116 :     EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
    1196             :                                      (Node *) stmt);
    1197             : 
    1198         116 :     InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
    1199         116 : }
    1200             : 
    1201             : /*
    1202             :  * Invalidate the relations.
    1203             :  */
    1204             : void
    1205        2450 : InvalidatePublicationRels(List *relids)
    1206             : {
    1207             :     /*
    1208             :      * We don't want to send too many individual messages, at some point it's
    1209             :      * cheaper to just reset whole relcache.
    1210             :      */
    1211        2450 :     if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
    1212             :     {
    1213             :         ListCell   *lc;
    1214             : 
    1215       20726 :         foreach(lc, relids)
    1216       18276 :             CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
    1217             :     }
    1218             :     else
    1219           0 :         CacheInvalidateRelcacheAll();
    1220        2450 : }
    1221             : 
    1222             : /*
    1223             :  * Add or remove table to/from publication.
    1224             :  */
    1225             : static void
    1226         918 : AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
    1227             :                        List *tables, const char *queryString,
    1228             :                        bool publish_schema)
    1229             : {
    1230         918 :     List       *rels = NIL;
    1231         918 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
    1232         918 :     Oid         pubid = pubform->oid;
    1233             : 
    1234             :     /*
    1235             :      * Nothing to do if no objects, except in SET: for that it is quite
    1236             :      * possible that user has not specified any tables in which case we need
    1237             :      * to remove all the existing tables.
    1238             :      */
    1239         918 :     if (!tables && stmt->action != AP_SetObjects)
    1240          76 :         return;
    1241             : 
    1242         842 :     rels = OpenTableList(tables);
    1243             : 
    1244         842 :     if (stmt->action == AP_AddObjects)
    1245             :     {
    1246         296 :         TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
    1247             : 
    1248         278 :         publish_schema |= is_schema_publication(pubid);
    1249             : 
    1250         278 :         CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
    1251         278 :                                    pubform->pubviaroot);
    1252             : 
    1253         266 :         PublicationAddTables(pubid, rels, false, stmt);
    1254             :     }
    1255         546 :     else if (stmt->action == AP_DropObjects)
    1256         100 :         PublicationDropTables(pubid, rels, false);
    1257             :     else                        /* AP_SetObjects */
    1258             :     {
    1259         446 :         List       *oldrelids = GetPublicationRelations(pubid,
    1260             :                                                         PUBLICATION_PART_ROOT);
    1261         446 :         List       *delrels = NIL;
    1262             :         ListCell   *oldlc;
    1263             : 
    1264         446 :         TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
    1265             : 
    1266         428 :         CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
    1267         428 :                                    pubform->pubviaroot);
    1268             : 
    1269             :         /*
    1270             :          * To recreate the relation list for the publication, look for
    1271             :          * existing relations that do not need to be dropped.
    1272             :          */
    1273         806 :         foreach(oldlc, oldrelids)
    1274             :         {
    1275         402 :             Oid         oldrelid = lfirst_oid(oldlc);
    1276             :             ListCell   *newlc;
    1277             :             PublicationRelInfo *oldrel;
    1278         402 :             bool        found = false;
    1279             :             HeapTuple   rftuple;
    1280         402 :             Node       *oldrelwhereclause = NULL;
    1281         402 :             Bitmapset  *oldcolumns = NULL;
    1282             : 
    1283             :             /* look up the cache for the old relmap */
    1284         402 :             rftuple = SearchSysCache2(PUBLICATIONRELMAP,
    1285             :                                       ObjectIdGetDatum(oldrelid),
    1286             :                                       ObjectIdGetDatum(pubid));
    1287             : 
    1288             :             /*
    1289             :              * See if the existing relation currently has a WHERE clause or a
    1290             :              * column list. We need to compare those too.
    1291             :              */
    1292         402 :             if (HeapTupleIsValid(rftuple))
    1293             :             {
    1294         402 :                 bool        isnull = true;
    1295             :                 Datum       whereClauseDatum;
    1296             :                 Datum       columnListDatum;
    1297             : 
    1298             :                 /* Load the WHERE clause for this table. */
    1299         402 :                 whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
    1300             :                                                    Anum_pg_publication_rel_prqual,
    1301             :                                                    &isnull);
    1302         402 :                 if (!isnull)
    1303         210 :                     oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
    1304             : 
    1305             :                 /* Transform the int2vector column list to a bitmap. */
    1306         402 :                 columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
    1307             :                                                   Anum_pg_publication_rel_prattrs,
    1308             :                                                   &isnull);
    1309             : 
    1310         402 :                 if (!isnull)
    1311         136 :                     oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
    1312             : 
    1313         402 :                 ReleaseSysCache(rftuple);
    1314             :             }
    1315             : 
    1316         778 :             foreach(newlc, rels)
    1317             :             {
    1318             :                 PublicationRelInfo *newpubrel;
    1319             :                 Oid         newrelid;
    1320         404 :                 Bitmapset  *newcolumns = NULL;
    1321             : 
    1322         404 :                 newpubrel = (PublicationRelInfo *) lfirst(newlc);
    1323         404 :                 newrelid = RelationGetRelid(newpubrel->relation);
    1324             : 
    1325             :                 /*
    1326             :                  * Validate the column list.  If the column list or WHERE
    1327             :                  * clause changes, then the validation done here will be
    1328             :                  * duplicated inside PublicationAddTables().  The validation
    1329             :                  * is cheap enough that that seems harmless.
    1330             :                  */
    1331         404 :                 newcolumns = pub_collist_validate(newpubrel->relation,
    1332             :                                                   newpubrel->columns);
    1333             : 
    1334             :                 /*
    1335             :                  * Check if any of the new set of relations matches with the
    1336             :                  * existing relations in the publication. Additionally, if the
    1337             :                  * relation has an associated WHERE clause, check the WHERE
    1338             :                  * expressions also match. Same for the column list. Drop the
    1339             :                  * rest.
    1340             :                  */
    1341         392 :                 if (newrelid == oldrelid)
    1342             :                 {
    1343         284 :                     if (equal(oldrelwhereclause, newpubrel->whereClause) &&
    1344          80 :                         bms_equal(oldcolumns, newcolumns))
    1345             :                     {
    1346          16 :                         found = true;
    1347          16 :                         break;
    1348             :                     }
    1349             :                 }
    1350             :             }
    1351             : 
    1352             :             /*
    1353             :              * Add the non-matched relations to a list so that they can be
    1354             :              * dropped.
    1355             :              */
    1356         390 :             if (!found)
    1357             :             {
    1358         374 :                 oldrel = palloc_object(PublicationRelInfo);
    1359         374 :                 oldrel->whereClause = NULL;
    1360         374 :                 oldrel->columns = NIL;
    1361         374 :                 oldrel->relation = table_open(oldrelid,
    1362             :                                               ShareUpdateExclusiveLock);
    1363         374 :                 delrels = lappend(delrels, oldrel);
    1364             :             }
    1365             :         }
    1366             : 
    1367             :         /* And drop them. */
    1368         404 :         PublicationDropTables(pubid, delrels, true);
    1369             : 
    1370             :         /*
    1371             :          * Don't bother calculating the difference for adding, we'll catch and
    1372             :          * skip existing ones when doing catalog update.
    1373             :          */
    1374         404 :         PublicationAddTables(pubid, rels, true, stmt);
    1375             : 
    1376         404 :         CloseTableList(delrels);
    1377             :     }
    1378             : 
    1379         710 :     CloseTableList(rels);
    1380             : }
    1381             : 
    1382             : /*
    1383             :  * Alter the publication schemas.
    1384             :  *
    1385             :  * Add or remove schemas to/from publication.
    1386             :  */
    1387             : static void
    1388         786 : AlterPublicationSchemas(AlterPublicationStmt *stmt,
    1389             :                         HeapTuple tup, List *schemaidlist)
    1390             : {
    1391         786 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
    1392             : 
    1393             :     /*
    1394             :      * Nothing to do if no objects, except in SET: for that it is quite
    1395             :      * possible that user has not specified any schemas in which case we need
    1396             :      * to remove all the existing schemas.
    1397             :      */
    1398         786 :     if (!schemaidlist && stmt->action != AP_SetObjects)
    1399         306 :         return;
    1400             : 
    1401             :     /*
    1402             :      * Schema lock is held until the publication is altered to prevent
    1403             :      * concurrent schema deletion.
    1404             :      */
    1405         480 :     LockSchemaList(schemaidlist);
    1406         480 :     if (stmt->action == AP_AddObjects)
    1407             :     {
    1408             :         ListCell   *lc;
    1409             :         List       *reloids;
    1410             : 
    1411          38 :         reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
    1412             : 
    1413          54 :         foreach(lc, reloids)
    1414             :         {
    1415             :             HeapTuple   coltuple;
    1416             : 
    1417          22 :             coltuple = SearchSysCache2(PUBLICATIONRELMAP,
    1418             :                                        ObjectIdGetDatum(lfirst_oid(lc)),
    1419             :                                        ObjectIdGetDatum(pubform->oid));
    1420             : 
    1421          22 :             if (!HeapTupleIsValid(coltuple))
    1422           0 :                 continue;
    1423             : 
    1424             :             /*
    1425             :              * Disallow adding schema if column list is already part of the
    1426             :              * publication. See CheckPubRelationColumnList.
    1427             :              */
    1428          22 :             if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
    1429           6 :                 ereport(ERROR,
    1430             :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1431             :                         errmsg("cannot add schema to publication \"%s\"",
    1432             :                                stmt->pubname),
    1433             :                         errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
    1434             : 
    1435          16 :             ReleaseSysCache(coltuple);
    1436             :         }
    1437             : 
    1438          32 :         PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
    1439             :     }
    1440         442 :     else if (stmt->action == AP_DropObjects)
    1441          38 :         PublicationDropSchemas(pubform->oid, schemaidlist, false);
    1442             :     else                        /* AP_SetObjects */
    1443             :     {
    1444         404 :         List       *oldschemaids = GetPublicationSchemas(pubform->oid);
    1445         404 :         List       *delschemas = NIL;
    1446             : 
    1447             :         /* Identify which schemas should be dropped */
    1448         404 :         delschemas = list_difference_oid(oldschemaids, schemaidlist);
    1449             : 
    1450             :         /*
    1451             :          * Schema lock is held until the publication is altered to prevent
    1452             :          * concurrent schema deletion.
    1453             :          */
    1454         404 :         LockSchemaList(delschemas);
    1455             : 
    1456             :         /* And drop them */
    1457         404 :         PublicationDropSchemas(pubform->oid, delschemas, true);
    1458             : 
    1459             :         /*
    1460             :          * Don't bother calculating the difference for adding, we'll catch and
    1461             :          * skip existing ones when doing catalog update.
    1462             :          */
    1463         404 :         PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
    1464             :     }
    1465             : }
    1466             : 
    1467             : /*
    1468             :  * Check if relations and schemas can be in a given publication and throw
    1469             :  * appropriate error if not.
    1470             :  */
    1471             : static void
    1472         960 : CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
    1473             :                       List *tables, List *schemaidlist)
    1474             : {
    1475         960 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
    1476             : 
    1477         960 :     if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
    1478         104 :         schemaidlist && !superuser())
    1479           6 :         ereport(ERROR,
    1480             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1481             :                  errmsg("must be superuser to add or set schemas")));
    1482             : 
    1483             :     /*
    1484             :      * Check that user is allowed to manipulate the publication tables in
    1485             :      * schema
    1486             :      */
    1487         954 :     if (schemaidlist && (pubform->puballtables || pubform->puballsequences))
    1488             :     {
    1489          18 :         if (pubform->puballtables && pubform->puballsequences)
    1490           0 :             ereport(ERROR,
    1491             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1492             :                     errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
    1493             :                            NameStr(pubform->pubname)),
    1494             :                     errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
    1495          18 :         else if (pubform->puballtables)
    1496          18 :             ereport(ERROR,
    1497             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1498             :                     errmsg("publication \"%s\" is defined as FOR ALL TABLES",
    1499             :                            NameStr(pubform->pubname)),
    1500             :                     errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications."));
    1501             :         else
    1502           0 :             ereport(ERROR,
    1503             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1504             :                     errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
    1505             :                            NameStr(pubform->pubname)),
    1506             :                     errdetail("Schemas cannot be added to or dropped from FOR ALL SEQUENCES publications."));
    1507             :     }
    1508             : 
    1509             :     /* Check that user is allowed to manipulate the publication tables. */
    1510         936 :     if (tables && (pubform->puballtables || pubform->puballsequences))
    1511             :     {
    1512          18 :         if (pubform->puballtables && pubform->puballsequences)
    1513           0 :             ereport(ERROR,
    1514             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1515             :                     errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
    1516             :                            NameStr(pubform->pubname)),
    1517             :                     errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
    1518          18 :         else if (pubform->puballtables)
    1519          18 :             ereport(ERROR,
    1520             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1521             :                     errmsg("publication \"%s\" is defined as FOR ALL TABLES",
    1522             :                            NameStr(pubform->pubname)),
    1523             :                     errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES publications."));
    1524             :         else
    1525           0 :             ereport(ERROR,
    1526             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1527             :                     errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
    1528             :                            NameStr(pubform->pubname)),
    1529             :                     errdetail("Tables or sequences cannot be added to or dropped from FOR ALL SEQUENCES publications."));
    1530             :     }
    1531         918 : }
    1532             : 
    1533             : /*
    1534             :  * Alter the existing publication.
    1535             :  *
    1536             :  * This is dispatcher function for AlterPublicationOptions,
    1537             :  * AlterPublicationSchemas and AlterPublicationTables.
    1538             :  */
    1539             : void
    1540        1106 : AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
    1541             : {
    1542             :     Relation    rel;
    1543             :     HeapTuple   tup;
    1544             :     Form_pg_publication pubform;
    1545             : 
    1546        1106 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    1547             : 
    1548        1106 :     tup = SearchSysCacheCopy1(PUBLICATIONNAME,
    1549             :                               CStringGetDatum(stmt->pubname));
    1550             : 
    1551        1106 :     if (!HeapTupleIsValid(tup))
    1552           0 :         ereport(ERROR,
    1553             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    1554             :                  errmsg("publication \"%s\" does not exist",
    1555             :                         stmt->pubname)));
    1556             : 
    1557        1106 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1558             : 
    1559             :     /* must be owner */
    1560        1106 :     if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
    1561           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
    1562           0 :                        stmt->pubname);
    1563             : 
    1564        1106 :     if (stmt->options)
    1565         128 :         AlterPublicationOptions(pstate, stmt, rel, tup);
    1566             :     else
    1567             :     {
    1568         978 :         List       *relations = NIL;
    1569         978 :         List       *schemaidlist = NIL;
    1570         978 :         Oid         pubid = pubform->oid;
    1571             : 
    1572         978 :         ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
    1573             :                                    &schemaidlist);
    1574             : 
    1575         960 :         CheckAlterPublication(stmt, tup, relations, schemaidlist);
    1576             : 
    1577         918 :         heap_freetuple(tup);
    1578             : 
    1579             :         /* Lock the publication so nobody else can do anything with it. */
    1580         918 :         LockDatabaseObject(PublicationRelationId, pubid, 0,
    1581             :                            AccessExclusiveLock);
    1582             : 
    1583             :         /*
    1584             :          * It is possible that by the time we acquire the lock on publication,
    1585             :          * concurrent DDL has removed it. We can test this by checking the
    1586             :          * existence of publication. We get the tuple again to avoid the risk
    1587             :          * of any publication option getting changed.
    1588             :          */
    1589         918 :         tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    1590         918 :         if (!HeapTupleIsValid(tup))
    1591           0 :             ereport(ERROR,
    1592             :                     errcode(ERRCODE_UNDEFINED_OBJECT),
    1593             :                     errmsg("publication \"%s\" does not exist",
    1594             :                            stmt->pubname));
    1595             : 
    1596         918 :         AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
    1597             :                                schemaidlist != NIL);
    1598         786 :         AlterPublicationSchemas(stmt, tup, schemaidlist);
    1599             :     }
    1600             : 
    1601             :     /* Cleanup. */
    1602         884 :     heap_freetuple(tup);
    1603         884 :     table_close(rel, RowExclusiveLock);
    1604         884 : }
    1605             : 
    1606             : /*
    1607             :  * Remove relation from publication by mapping OID.
    1608             :  */
    1609             : void
    1610         848 : RemovePublicationRelById(Oid proid)
    1611             : {
    1612             :     Relation    rel;
    1613             :     HeapTuple   tup;
    1614             :     Form_pg_publication_rel pubrel;
    1615         848 :     List       *relids = NIL;
    1616             : 
    1617         848 :     rel = table_open(PublicationRelRelationId, RowExclusiveLock);
    1618             : 
    1619         848 :     tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
    1620             : 
    1621         848 :     if (!HeapTupleIsValid(tup))
    1622           0 :         elog(ERROR, "cache lookup failed for publication table %u",
    1623             :              proid);
    1624             : 
    1625         848 :     pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
    1626             : 
    1627             :     /*
    1628             :      * Invalidate relcache so that publication info is rebuilt.
    1629             :      *
    1630             :      * For the partitioned tables, we must invalidate all partitions contained
    1631             :      * in the respective partition hierarchies, not just the one explicitly
    1632             :      * mentioned in the publication. This is required because we implicitly
    1633             :      * publish the child tables when the parent table is published.
    1634             :      */
    1635         848 :     relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
    1636             :                                             pubrel->prrelid);
    1637             : 
    1638         848 :     InvalidatePublicationRels(relids);
    1639             : 
    1640         848 :     CatalogTupleDelete(rel, &tup->t_self);
    1641             : 
    1642         848 :     ReleaseSysCache(tup);
    1643             : 
    1644         848 :     table_close(rel, RowExclusiveLock);
    1645         848 : }
    1646             : 
    1647             : /*
    1648             :  * Remove the publication by mapping OID.
    1649             :  */
    1650             : void
    1651         500 : RemovePublicationById(Oid pubid)
    1652             : {
    1653             :     Relation    rel;
    1654             :     HeapTuple   tup;
    1655             :     Form_pg_publication pubform;
    1656             : 
    1657         500 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    1658             : 
    1659         500 :     tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    1660         500 :     if (!HeapTupleIsValid(tup))
    1661           0 :         elog(ERROR, "cache lookup failed for publication %u", pubid);
    1662             : 
    1663         500 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1664             : 
    1665             :     /* Invalidate relcache so that publication info is rebuilt. */
    1666         500 :     if (pubform->puballtables)
    1667          70 :         CacheInvalidateRelcacheAll();
    1668             : 
    1669         500 :     CatalogTupleDelete(rel, &tup->t_self);
    1670             : 
    1671         500 :     ReleaseSysCache(tup);
    1672             : 
    1673         500 :     table_close(rel, RowExclusiveLock);
    1674         500 : }
    1675             : 
    1676             : /*
    1677             :  * Remove schema from publication by mapping OID.
    1678             :  */
    1679             : void
    1680         192 : RemovePublicationSchemaById(Oid psoid)
    1681             : {
    1682             :     Relation    rel;
    1683             :     HeapTuple   tup;
    1684         192 :     List       *schemaRels = NIL;
    1685             :     Form_pg_publication_namespace pubsch;
    1686             : 
    1687         192 :     rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
    1688             : 
    1689         192 :     tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
    1690             : 
    1691         192 :     if (!HeapTupleIsValid(tup))
    1692           0 :         elog(ERROR, "cache lookup failed for publication schema %u", psoid);
    1693             : 
    1694         192 :     pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
    1695             : 
    1696             :     /*
    1697             :      * Invalidate relcache so that publication info is rebuilt. See
    1698             :      * RemovePublicationRelById for why we need to consider all the
    1699             :      * partitions.
    1700             :      */
    1701         192 :     schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
    1702             :                                                PUBLICATION_PART_ALL);
    1703         192 :     InvalidatePublicationRels(schemaRels);
    1704             : 
    1705         192 :     CatalogTupleDelete(rel, &tup->t_self);
    1706             : 
    1707         192 :     ReleaseSysCache(tup);
    1708             : 
    1709         192 :     table_close(rel, RowExclusiveLock);
    1710         192 : }
    1711             : 
    1712             : /*
    1713             :  * Open relations specified by a PublicationTable list.
    1714             :  * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
    1715             :  * add them to a publication.
    1716             :  */
    1717             : static List *
    1718        1316 : OpenTableList(List *tables)
    1719             : {
    1720        1316 :     List       *relids = NIL;
    1721        1316 :     List       *rels = NIL;
    1722             :     ListCell   *lc;
    1723        1316 :     List       *relids_with_rf = NIL;
    1724        1316 :     List       *relids_with_collist = NIL;
    1725             : 
    1726             :     /*
    1727             :      * Open, share-lock, and check all the explicitly-specified relations
    1728             :      */
    1729        2700 :     foreach(lc, tables)
    1730             :     {
    1731        1414 :         PublicationTable *t = lfirst_node(PublicationTable, lc);
    1732        1414 :         bool        recurse = t->relation->inh;
    1733             :         Relation    rel;
    1734             :         Oid         myrelid;
    1735             :         PublicationRelInfo *pub_rel;
    1736             : 
    1737             :         /* Allow query cancel in case this takes a long time */
    1738        1414 :         CHECK_FOR_INTERRUPTS();
    1739             : 
    1740        1414 :         rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
    1741        1408 :         myrelid = RelationGetRelid(rel);
    1742             : 
    1743             :         /*
    1744             :          * Filter out duplicates if user specifies "foo, foo".
    1745             :          *
    1746             :          * Note that this algorithm is known to not be very efficient (O(N^2))
    1747             :          * but given that it only works on list of tables given to us by user
    1748             :          * it's deemed acceptable.
    1749             :          */
    1750        1408 :         if (list_member_oid(relids, myrelid))
    1751             :         {
    1752             :             /* Disallow duplicate tables if there are any with row filters. */
    1753          24 :             if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
    1754          12 :                 ereport(ERROR,
    1755             :                         (errcode(ERRCODE_DUPLICATE_OBJECT),
    1756             :                          errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
    1757             :                                 RelationGetRelationName(rel))));
    1758             : 
    1759             :             /* Disallow duplicate tables if there are any with column lists. */
    1760          12 :             if (t->columns || list_member_oid(relids_with_collist, myrelid))
    1761          12 :                 ereport(ERROR,
    1762             :                         (errcode(ERRCODE_DUPLICATE_OBJECT),
    1763             :                          errmsg("conflicting or redundant column lists for table \"%s\"",
    1764             :                                 RelationGetRelationName(rel))));
    1765             : 
    1766           0 :             table_close(rel, ShareUpdateExclusiveLock);
    1767           0 :             continue;
    1768             :         }
    1769             : 
    1770        1384 :         pub_rel = palloc_object(PublicationRelInfo);
    1771        1384 :         pub_rel->relation = rel;
    1772        1384 :         pub_rel->whereClause = t->whereClause;
    1773        1384 :         pub_rel->columns = t->columns;
    1774        1384 :         rels = lappend(rels, pub_rel);
    1775        1384 :         relids = lappend_oid(relids, myrelid);
    1776             : 
    1777        1384 :         if (t->whereClause)
    1778         404 :             relids_with_rf = lappend_oid(relids_with_rf, myrelid);
    1779             : 
    1780        1384 :         if (t->columns)
    1781         410 :             relids_with_collist = lappend_oid(relids_with_collist, myrelid);
    1782             : 
    1783             :         /*
    1784             :          * Add children of this rel, if requested, so that they too are added
    1785             :          * to the publication.  A partitioned table can't have any inheritance
    1786             :          * children other than its partitions, which need not be explicitly
    1787             :          * added to the publication.
    1788             :          */
    1789        1384 :         if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
    1790             :         {
    1791             :             List       *children;
    1792             :             ListCell   *child;
    1793             : 
    1794        1160 :             children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
    1795             :                                            NULL);
    1796             : 
    1797        2328 :             foreach(child, children)
    1798             :             {
    1799        1168 :                 Oid         childrelid = lfirst_oid(child);
    1800             : 
    1801             :                 /* Allow query cancel in case this takes a long time */
    1802        1168 :                 CHECK_FOR_INTERRUPTS();
    1803             : 
    1804             :                 /*
    1805             :                  * Skip duplicates if user specified both parent and child
    1806             :                  * tables.
    1807             :                  */
    1808        1168 :                 if (list_member_oid(relids, childrelid))
    1809             :                 {
    1810             :                     /*
    1811             :                      * We don't allow to specify row filter for both parent
    1812             :                      * and child table at the same time as it is not very
    1813             :                      * clear which one should be given preference.
    1814             :                      */
    1815        1160 :                     if (childrelid != myrelid &&
    1816           0 :                         (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
    1817           0 :                         ereport(ERROR,
    1818             :                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    1819             :                                  errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
    1820             :                                         RelationGetRelationName(rel))));
    1821             : 
    1822             :                     /*
    1823             :                      * We don't allow to specify column list for both parent
    1824             :                      * and child table at the same time as it is not very
    1825             :                      * clear which one should be given preference.
    1826             :                      */
    1827        1160 :                     if (childrelid != myrelid &&
    1828           0 :                         (t->columns || list_member_oid(relids_with_collist, childrelid)))
    1829           0 :                         ereport(ERROR,
    1830             :                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    1831             :                                  errmsg("conflicting or redundant column lists for table \"%s\"",
    1832             :                                         RelationGetRelationName(rel))));
    1833             : 
    1834        1160 :                     continue;
    1835             :                 }
    1836             : 
    1837             :                 /* find_all_inheritors already got lock */
    1838           8 :                 rel = table_open(childrelid, NoLock);
    1839           8 :                 pub_rel = palloc_object(PublicationRelInfo);
    1840           8 :                 pub_rel->relation = rel;
    1841             :                 /* child inherits WHERE clause from parent */
    1842           8 :                 pub_rel->whereClause = t->whereClause;
    1843             : 
    1844             :                 /* child inherits column list from parent */
    1845           8 :                 pub_rel->columns = t->columns;
    1846           8 :                 rels = lappend(rels, pub_rel);
    1847           8 :                 relids = lappend_oid(relids, childrelid);
    1848             : 
    1849           8 :                 if (t->whereClause)
    1850           2 :                     relids_with_rf = lappend_oid(relids_with_rf, childrelid);
    1851             : 
    1852           8 :                 if (t->columns)
    1853           0 :                     relids_with_collist = lappend_oid(relids_with_collist, childrelid);
    1854             :             }
    1855             :         }
    1856             :     }
    1857             : 
    1858        1286 :     list_free(relids);
    1859        1286 :     list_free(relids_with_rf);
    1860             : 
    1861        1286 :     return rels;
    1862             : }
    1863             : 
    1864             : /*
    1865             :  * Close all relations in the list.
    1866             :  */
    1867             : static void
    1868        1502 : CloseTableList(List *rels)
    1869             : {
    1870             :     ListCell   *lc;
    1871             : 
    1872        3056 :     foreach(lc, rels)
    1873             :     {
    1874             :         PublicationRelInfo *pub_rel;
    1875             : 
    1876        1554 :         pub_rel = (PublicationRelInfo *) lfirst(lc);
    1877        1554 :         table_close(pub_rel->relation, NoLock);
    1878             :     }
    1879             : 
    1880        1502 :     list_free_deep(rels);
    1881        1502 : }
    1882             : 
    1883             : /*
    1884             :  * Lock the schemas specified in the schema list in AccessShareLock mode in
    1885             :  * order to prevent concurrent schema deletion.
    1886             :  */
    1887             : static void
    1888        1036 : LockSchemaList(List *schemalist)
    1889             : {
    1890             :     ListCell   *lc;
    1891             : 
    1892        1360 :     foreach(lc, schemalist)
    1893             :     {
    1894         324 :         Oid         schemaid = lfirst_oid(lc);
    1895             : 
    1896             :         /* Allow query cancel in case this takes a long time */
    1897         324 :         CHECK_FOR_INTERRUPTS();
    1898         324 :         LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
    1899             : 
    1900             :         /*
    1901             :          * It is possible that by the time we acquire the lock on schema,
    1902             :          * concurrent DDL has removed it. We can test this by checking the
    1903             :          * existence of schema.
    1904             :          */
    1905         324 :         if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
    1906           0 :             ereport(ERROR,
    1907             :                     errcode(ERRCODE_UNDEFINED_SCHEMA),
    1908             :                     errmsg("schema with OID %u does not exist", schemaid));
    1909             :     }
    1910        1036 : }
    1911             : 
    1912             : /*
    1913             :  * Add listed tables to the publication.
    1914             :  */
    1915             : static void
    1916        1084 : PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
    1917             :                      AlterPublicationStmt *stmt)
    1918             : {
    1919             :     ListCell   *lc;
    1920             : 
    1921        2176 :     foreach(lc, rels)
    1922             :     {
    1923        1160 :         PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
    1924        1160 :         Relation    rel = pub_rel->relation;
    1925             :         ObjectAddress obj;
    1926             : 
    1927             :         /* Must be owner of the table or superuser. */
    1928        1160 :         if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
    1929           6 :             aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
    1930           6 :                            RelationGetRelationName(rel));
    1931             : 
    1932        1154 :         obj = publication_add_relation(pubid, pub_rel, if_not_exists);
    1933        1092 :         if (stmt)
    1934             :         {
    1935         628 :             EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
    1936             :                                              (Node *) stmt);
    1937             : 
    1938         628 :             InvokeObjectPostCreateHook(PublicationRelRelationId,
    1939             :                                        obj.objectId, 0);
    1940             :         }
    1941             :     }
    1942        1016 : }
    1943             : 
    1944             : /*
    1945             :  * Remove listed tables from the publication.
    1946             :  */
    1947             : static void
    1948         504 : PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
    1949             : {
    1950             :     ObjectAddress obj;
    1951             :     ListCell   *lc;
    1952             :     Oid         prid;
    1953             : 
    1954         966 :     foreach(lc, rels)
    1955             :     {
    1956         480 :         PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
    1957         480 :         Relation    rel = pubrel->relation;
    1958         480 :         Oid         relid = RelationGetRelid(rel);
    1959             : 
    1960         480 :         if (pubrel->columns)
    1961           0 :             ereport(ERROR,
    1962             :                     errcode(ERRCODE_SYNTAX_ERROR),
    1963             :                     errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
    1964             : 
    1965         480 :         prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
    1966             :                                ObjectIdGetDatum(relid),
    1967             :                                ObjectIdGetDatum(pubid));
    1968         480 :         if (!OidIsValid(prid))
    1969             :         {
    1970          12 :             if (missing_ok)
    1971           0 :                 continue;
    1972             : 
    1973          12 :             ereport(ERROR,
    1974             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    1975             :                      errmsg("relation \"%s\" is not part of the publication",
    1976             :                             RelationGetRelationName(rel))));
    1977             :         }
    1978             : 
    1979         468 :         if (pubrel->whereClause)
    1980           6 :             ereport(ERROR,
    1981             :                     (errcode(ERRCODE_SYNTAX_ERROR),
    1982             :                      errmsg("cannot use a WHERE clause when removing a table from a publication")));
    1983             : 
    1984         462 :         ObjectAddressSet(obj, PublicationRelRelationId, prid);
    1985         462 :         performDeletion(&obj, DROP_CASCADE, 0);
    1986             :     }
    1987         486 : }
    1988             : 
    1989             : /*
    1990             :  * Add listed schemas to the publication.
    1991             :  */
    1992             : static void
    1993         588 : PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
    1994             :                       AlterPublicationStmt *stmt)
    1995             : {
    1996             :     ListCell   *lc;
    1997             : 
    1998         838 :     foreach(lc, schemas)
    1999             :     {
    2000         262 :         Oid         schemaid = lfirst_oid(lc);
    2001             :         ObjectAddress obj;
    2002             : 
    2003         262 :         obj = publication_add_schema(pubid, schemaid, if_not_exists);
    2004         250 :         if (stmt)
    2005             :         {
    2006          68 :             EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
    2007             :                                              (Node *) stmt);
    2008             : 
    2009          68 :             InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
    2010             :                                        obj.objectId, 0);
    2011             :         }
    2012             :     }
    2013         576 : }
    2014             : 
    2015             : /*
    2016             :  * Remove listed schemas from the publication.
    2017             :  */
    2018             : static void
    2019         442 : PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
    2020             : {
    2021             :     ObjectAddress obj;
    2022             :     ListCell   *lc;
    2023             :     Oid         psid;
    2024             : 
    2025         492 :     foreach(lc, schemas)
    2026             :     {
    2027          56 :         Oid         schemaid = lfirst_oid(lc);
    2028             : 
    2029          56 :         psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
    2030             :                                Anum_pg_publication_namespace_oid,
    2031             :                                ObjectIdGetDatum(schemaid),
    2032             :                                ObjectIdGetDatum(pubid));
    2033          56 :         if (!OidIsValid(psid))
    2034             :         {
    2035           6 :             if (missing_ok)
    2036           0 :                 continue;
    2037             : 
    2038           6 :             ereport(ERROR,
    2039             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    2040             :                      errmsg("tables from schema \"%s\" are not part of the publication",
    2041             :                             get_namespace_name(schemaid))));
    2042             :         }
    2043             : 
    2044          50 :         ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
    2045          50 :         performDeletion(&obj, DROP_CASCADE, 0);
    2046             :     }
    2047         436 : }
    2048             : 
    2049             : /*
    2050             :  * Internal workhorse for changing a publication owner
    2051             :  */
    2052             : static void
    2053          36 : AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
    2054             : {
    2055             :     Form_pg_publication form;
    2056             : 
    2057          36 :     form = (Form_pg_publication) GETSTRUCT(tup);
    2058             : 
    2059          36 :     if (form->pubowner == newOwnerId)
    2060          12 :         return;
    2061             : 
    2062          24 :     if (!superuser())
    2063             :     {
    2064             :         AclResult   aclresult;
    2065             : 
    2066             :         /* Must be owner */
    2067          12 :         if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
    2068           0 :             aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
    2069           0 :                            NameStr(form->pubname));
    2070             : 
    2071             :         /* Must be able to become new owner */
    2072          12 :         check_can_set_role(GetUserId(), newOwnerId);
    2073             : 
    2074             :         /* New owner must have CREATE privilege on database */
    2075          12 :         aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
    2076          12 :         if (aclresult != ACLCHECK_OK)
    2077           0 :             aclcheck_error(aclresult, OBJECT_DATABASE,
    2078           0 :                            get_database_name(MyDatabaseId));
    2079             : 
    2080          12 :         if (!superuser_arg(newOwnerId))
    2081             :         {
    2082          12 :             if (form->puballtables || form->puballsequences ||
    2083           6 :                 is_schema_publication(form->oid))
    2084           6 :                 ereport(ERROR,
    2085             :                         errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    2086             :                         errmsg("permission denied to change owner of publication \"%s\"",
    2087             :                                NameStr(form->pubname)),
    2088             :                         errhint("The owner of a FOR ALL TABLES or ALL SEQUENCES or TABLES IN SCHEMA publication must be a superuser."));
    2089             :         }
    2090             :     }
    2091             : 
    2092          18 :     form->pubowner = newOwnerId;
    2093          18 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    2094             : 
    2095             :     /* Update owner dependency reference */
    2096          18 :     changeDependencyOnOwner(PublicationRelationId,
    2097             :                             form->oid,
    2098             :                             newOwnerId);
    2099             : 
    2100          18 :     InvokeObjectPostAlterHook(PublicationRelationId,
    2101             :                               form->oid, 0);
    2102             : }
    2103             : 
    2104             : /*
    2105             :  * Change publication owner -- by name
    2106             :  */
    2107             : ObjectAddress
    2108          36 : AlterPublicationOwner(const char *name, Oid newOwnerId)
    2109             : {
    2110             :     Oid         pubid;
    2111             :     HeapTuple   tup;
    2112             :     Relation    rel;
    2113             :     ObjectAddress address;
    2114             :     Form_pg_publication pubform;
    2115             : 
    2116          36 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    2117             : 
    2118          36 :     tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
    2119             : 
    2120          36 :     if (!HeapTupleIsValid(tup))
    2121           0 :         ereport(ERROR,
    2122             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2123             :                  errmsg("publication \"%s\" does not exist", name)));
    2124             : 
    2125          36 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    2126          36 :     pubid = pubform->oid;
    2127             : 
    2128          36 :     AlterPublicationOwner_internal(rel, tup, newOwnerId);
    2129             : 
    2130          30 :     ObjectAddressSet(address, PublicationRelationId, pubid);
    2131             : 
    2132          30 :     heap_freetuple(tup);
    2133             : 
    2134          30 :     table_close(rel, RowExclusiveLock);
    2135             : 
    2136          30 :     return address;
    2137             : }
    2138             : 
    2139             : /*
    2140             :  * Change publication owner -- by OID
    2141             :  */
    2142             : void
    2143           0 : AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId)
    2144             : {
    2145             :     HeapTuple   tup;
    2146             :     Relation    rel;
    2147             : 
    2148           0 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    2149             : 
    2150           0 :     tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    2151             : 
    2152           0 :     if (!HeapTupleIsValid(tup))
    2153           0 :         ereport(ERROR,
    2154             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2155             :                  errmsg("publication with OID %u does not exist", pubid)));
    2156             : 
    2157           0 :     AlterPublicationOwner_internal(rel, tup, newOwnerId);
    2158             : 
    2159           0 :     heap_freetuple(tup);
    2160             : 
    2161           0 :     table_close(rel, RowExclusiveLock);
    2162           0 : }
    2163             : 
    2164             : /*
    2165             :  * Extract the publish_generated_columns option value from a DefElem. "stored"
    2166             :  * and "none" values are accepted.
    2167             :  */
    2168             : static char
    2169          76 : defGetGeneratedColsOption(DefElem *def)
    2170             : {
    2171          76 :     char       *sval = "";
    2172             : 
    2173             :     /*
    2174             :      * A parameter value is required.
    2175             :      */
    2176          76 :     if (def->arg)
    2177             :     {
    2178          70 :         sval = defGetString(def);
    2179             : 
    2180          70 :         if (pg_strcasecmp(sval, "none") == 0)
    2181          22 :             return PUBLISH_GENCOLS_NONE;
    2182          48 :         if (pg_strcasecmp(sval, "stored") == 0)
    2183          42 :             return PUBLISH_GENCOLS_STORED;
    2184             :     }
    2185             : 
    2186          12 :     ereport(ERROR,
    2187             :             errcode(ERRCODE_SYNTAX_ERROR),
    2188             :             errmsg("invalid value for publication parameter \"%s\": \"%s\"", def->defname, sval),
    2189             :             errdetail("Valid values are \"%s\" and \"%s\".", "none", "stored"));
    2190             : 
    2191             :     return PUBLISH_GENCOLS_NONE;    /* keep compiler quiet */
    2192             : }

Generated by: LCOV version 1.16