LCOV - code coverage report
Current view: top level - src/backend/commands - publicationcmds.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 92.4 % 710 656
Test Date: 2026-02-27 13:14:42 Functions: 96.9 % 32 31
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * publicationcmds.c
       4              :  *      publication manipulation
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  * IDENTIFICATION
      10              :  *      src/backend/commands/publicationcmds.c
      11              :  *
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : 
      15              : #include "postgres.h"
      16              : 
      17              : #include "access/htup_details.h"
      18              : #include "access/table.h"
      19              : #include "access/xact.h"
      20              : #include "catalog/catalog.h"
      21              : #include "catalog/indexing.h"
      22              : #include "catalog/namespace.h"
      23              : #include "catalog/objectaccess.h"
      24              : #include "catalog/objectaddress.h"
      25              : #include "catalog/pg_database.h"
      26              : #include "catalog/pg_inherits.h"
      27              : #include "catalog/pg_namespace.h"
      28              : #include "catalog/pg_proc.h"
      29              : #include "catalog/pg_publication.h"
      30              : #include "catalog/pg_publication_namespace.h"
      31              : #include "catalog/pg_publication_rel.h"
      32              : #include "commands/defrem.h"
      33              : #include "commands/event_trigger.h"
      34              : #include "commands/publicationcmds.h"
      35              : #include "miscadmin.h"
      36              : #include "nodes/nodeFuncs.h"
      37              : #include "parser/parse_clause.h"
      38              : #include "parser/parse_collate.h"
      39              : #include "parser/parse_relation.h"
      40              : #include "rewrite/rewriteHandler.h"
      41              : #include "storage/lmgr.h"
      42              : #include "utils/acl.h"
      43              : #include "utils/builtins.h"
      44              : #include "utils/inval.h"
      45              : #include "utils/lsyscache.h"
      46              : #include "utils/rel.h"
      47              : #include "utils/syscache.h"
      48              : #include "utils/varlena.h"
      49              : 
      50              : 
      51              : /*
      52              :  * Information used to validate the columns in the row filter expression. See
      53              :  * contain_invalid_rfcolumn_walker for details.
      54              :  */
      55              : typedef struct rf_context
      56              : {
      57              :     Bitmapset  *bms_replident;  /* bitset of replica identity columns */
      58              :     bool        pubviaroot;     /* true if we are validating the parent
      59              :                                  * relation's row filter */
      60              :     Oid         relid;          /* relid of the relation */
      61              :     Oid         parentid;       /* relid of the parent relation */
      62              : } rf_context;
      63              : 
      64              : static List *OpenTableList(List *tables);
      65              : static void CloseTableList(List *rels);
      66              : static void LockSchemaList(List *schemalist);
      67              : static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
      68              :                                  AlterPublicationStmt *stmt);
      69              : static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
      70              : static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
      71              :                                   AlterPublicationStmt *stmt);
      72              : static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
      73              : static char defGetGeneratedColsOption(DefElem *def);
      74              : 
      75              : 
      76              : static void
      77          506 : parse_publication_options(ParseState *pstate,
      78              :                           List *options,
      79              :                           bool *publish_given,
      80              :                           PublicationActions *pubactions,
      81              :                           bool *publish_via_partition_root_given,
      82              :                           bool *publish_via_partition_root,
      83              :                           bool *publish_generated_columns_given,
      84              :                           char *publish_generated_columns)
      85              : {
      86              :     ListCell   *lc;
      87              : 
      88          506 :     *publish_given = false;
      89          506 :     *publish_via_partition_root_given = false;
      90          506 :     *publish_generated_columns_given = false;
      91              : 
      92              :     /* defaults */
      93          506 :     pubactions->pubinsert = true;
      94          506 :     pubactions->pubupdate = true;
      95          506 :     pubactions->pubdelete = true;
      96          506 :     pubactions->pubtruncate = true;
      97          506 :     *publish_via_partition_root = false;
      98          506 :     *publish_generated_columns = PUBLISH_GENCOLS_NONE;
      99              : 
     100              :     /* Parse options */
     101          688 :     foreach(lc, options)
     102              :     {
     103          200 :         DefElem    *defel = (DefElem *) lfirst(lc);
     104              : 
     105          200 :         if (strcmp(defel->defname, "publish") == 0)
     106              :         {
     107              :             char       *publish;
     108              :             List       *publish_list;
     109              :             ListCell   *lc2;
     110              : 
     111           70 :             if (*publish_given)
     112            0 :                 errorConflictingDefElem(defel, pstate);
     113              : 
     114              :             /*
     115              :              * If publish option was given only the explicitly listed actions
     116              :              * should be published.
     117              :              */
     118           70 :             pubactions->pubinsert = false;
     119           70 :             pubactions->pubupdate = false;
     120           70 :             pubactions->pubdelete = false;
     121           70 :             pubactions->pubtruncate = false;
     122              : 
     123           70 :             *publish_given = true;
     124              : 
     125              :             /*
     126              :              * SplitIdentifierString destructively modifies its input, so make
     127              :              * a copy so we don't modify the memory of the executing statement
     128              :              */
     129           70 :             publish = pstrdup(defGetString(defel));
     130              : 
     131           70 :             if (!SplitIdentifierString(publish, ',', &publish_list))
     132            0 :                 ereport(ERROR,
     133              :                         (errcode(ERRCODE_SYNTAX_ERROR),
     134              :                          errmsg("invalid list syntax in parameter \"%s\"",
     135              :                                 "publish")));
     136              : 
     137              :             /* Process the option list. */
     138          167 :             foreach(lc2, publish_list)
     139              :             {
     140          100 :                 char       *publish_opt = (char *) lfirst(lc2);
     141              : 
     142          100 :                 if (strcmp(publish_opt, "insert") == 0)
     143           62 :                     pubactions->pubinsert = true;
     144           38 :                 else if (strcmp(publish_opt, "update") == 0)
     145           15 :                     pubactions->pubupdate = true;
     146           23 :                 else if (strcmp(publish_opt, "delete") == 0)
     147           10 :                     pubactions->pubdelete = true;
     148           13 :                 else if (strcmp(publish_opt, "truncate") == 0)
     149           10 :                     pubactions->pubtruncate = true;
     150              :                 else
     151            3 :                     ereport(ERROR,
     152              :                             (errcode(ERRCODE_SYNTAX_ERROR),
     153              :                              errmsg("unrecognized value for publication option \"%s\": \"%s\"",
     154              :                                     "publish", publish_opt)));
     155              :             }
     156              :         }
     157          130 :         else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
     158              :         {
     159           86 :             if (*publish_via_partition_root_given)
     160            3 :                 errorConflictingDefElem(defel, pstate);
     161           83 :             *publish_via_partition_root_given = true;
     162           83 :             *publish_via_partition_root = defGetBoolean(defel);
     163              :         }
     164           44 :         else if (strcmp(defel->defname, "publish_generated_columns") == 0)
     165              :         {
     166           41 :             if (*publish_generated_columns_given)
     167            3 :                 errorConflictingDefElem(defel, pstate);
     168           38 :             *publish_generated_columns_given = true;
     169           38 :             *publish_generated_columns = defGetGeneratedColsOption(defel);
     170              :         }
     171              :         else
     172            3 :             ereport(ERROR,
     173              :                     (errcode(ERRCODE_SYNTAX_ERROR),
     174              :                      errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
     175              :     }
     176          488 : }
     177              : 
     178              : /*
     179              :  * Convert the PublicationObjSpecType list into schema oid list and
     180              :  * PublicationTable list.
     181              :  */
     182              : static void
     183          850 : ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
     184              :                            List **rels, List **schemas)
     185              : {
     186              :     ListCell   *cell;
     187              :     PublicationObjSpec *pubobj;
     188              : 
     189          850 :     if (!pubobjspec_list)
     190           52 :         return;
     191              : 
     192         1694 :     foreach(cell, pubobjspec_list)
     193              :     {
     194              :         Oid         schemaid;
     195              :         List       *search_path;
     196              : 
     197          914 :         pubobj = (PublicationObjSpec *) lfirst(cell);
     198              : 
     199          914 :         switch (pubobj->pubobjtype)
     200              :         {
     201          716 :             case PUBLICATIONOBJ_TABLE:
     202          716 :                 *rels = lappend(*rels, pubobj->pubtable);
     203          716 :                 break;
     204          186 :             case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
     205          186 :                 schemaid = get_namespace_oid(pubobj->name, false);
     206              : 
     207              :                 /* Filter out duplicates if user specifies "sch1, sch1" */
     208          171 :                 *schemas = list_append_unique_oid(*schemas, schemaid);
     209          171 :                 break;
     210           12 :             case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
     211           12 :                 search_path = fetch_search_path(false);
     212           12 :                 if (search_path == NIL) /* nothing valid in search_path? */
     213            3 :                     ereport(ERROR,
     214              :                             errcode(ERRCODE_UNDEFINED_SCHEMA),
     215              :                             errmsg("no schema has been selected for CURRENT_SCHEMA"));
     216              : 
     217            9 :                 schemaid = linitial_oid(search_path);
     218            9 :                 list_free(search_path);
     219              : 
     220              :                 /* Filter out duplicates if user specifies "sch1, sch1" */
     221            9 :                 *schemas = list_append_unique_oid(*schemas, schemaid);
     222            9 :                 break;
     223            0 :             default:
     224              :                 /* shouldn't happen */
     225            0 :                 elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
     226              :                 break;
     227              :         }
     228              :     }
     229              : }
     230              : 
     231              : /*
     232              :  * Returns true if any of the columns used in the row filter WHERE expression is
     233              :  * not part of REPLICA IDENTITY, false otherwise.
     234              :  */
     235              : static bool
     236          129 : contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
     237              : {
     238          129 :     if (node == NULL)
     239            0 :         return false;
     240              : 
     241          129 :     if (IsA(node, Var))
     242              :     {
     243           52 :         Var        *var = (Var *) node;
     244           52 :         AttrNumber  attnum = var->varattno;
     245              : 
     246              :         /*
     247              :          * If pubviaroot is true, we are validating the row filter of the
     248              :          * parent table, but the bitmap contains the replica identity
     249              :          * information of the child table. So, get the column number of the
     250              :          * child table as parent and child column order could be different.
     251              :          */
     252           52 :         if (context->pubviaroot)
     253              :         {
     254            8 :             char       *colname = get_attname(context->parentid, attnum, false);
     255              : 
     256            8 :             attnum = get_attnum(context->relid, colname);
     257              :         }
     258              : 
     259           52 :         if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
     260           52 :                            context->bms_replident))
     261           30 :             return true;
     262              :     }
     263              : 
     264           99 :     return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
     265              :                                   context);
     266              : }
     267              : 
     268              : /*
     269              :  * Check if all columns referenced in the filter expression are part of the
     270              :  * REPLICA IDENTITY index or not.
     271              :  *
     272              :  * Returns true if any invalid column is found.
     273              :  */
     274              : bool
     275          363 : pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
     276              :                                bool pubviaroot)
     277              : {
     278              :     HeapTuple   rftuple;
     279          363 :     Oid         relid = RelationGetRelid(relation);
     280          363 :     Oid         publish_as_relid = RelationGetRelid(relation);
     281          363 :     bool        result = false;
     282              :     Datum       rfdatum;
     283              :     bool        rfisnull;
     284              : 
     285              :     /*
     286              :      * FULL means all columns are in the REPLICA IDENTITY, so all columns are
     287              :      * allowed in the row filter and we can skip the validation.
     288              :      */
     289          363 :     if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     290           99 :         return false;
     291              : 
     292              :     /*
     293              :      * For a partition, if pubviaroot is true, find the topmost ancestor that
     294              :      * is published via this publication as we need to use its row filter
     295              :      * expression to filter the partition's changes.
     296              :      *
     297              :      * Note that even though the row filter used is for an ancestor, the
     298              :      * REPLICA IDENTITY used will be for the actual child table.
     299              :      */
     300          264 :     if (pubviaroot && relation->rd_rel->relispartition)
     301              :     {
     302              :         publish_as_relid
     303           54 :             = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
     304              : 
     305           54 :         if (!OidIsValid(publish_as_relid))
     306            3 :             publish_as_relid = relid;
     307              :     }
     308              : 
     309          264 :     rftuple = SearchSysCache2(PUBLICATIONRELMAP,
     310              :                               ObjectIdGetDatum(publish_as_relid),
     311              :                               ObjectIdGetDatum(pubid));
     312              : 
     313          264 :     if (!HeapTupleIsValid(rftuple))
     314           28 :         return false;
     315              : 
     316          236 :     rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
     317              :                               Anum_pg_publication_rel_prqual,
     318              :                               &rfisnull);
     319              : 
     320          236 :     if (!rfisnull)
     321              :     {
     322           51 :         rf_context  context = {0};
     323              :         Node       *rfnode;
     324           51 :         Bitmapset  *bms = NULL;
     325              : 
     326           51 :         context.pubviaroot = pubviaroot;
     327           51 :         context.parentid = publish_as_relid;
     328           51 :         context.relid = relid;
     329              : 
     330              :         /* Remember columns that are part of the REPLICA IDENTITY */
     331           51 :         bms = RelationGetIndexAttrBitmap(relation,
     332              :                                          INDEX_ATTR_BITMAP_IDENTITY_KEY);
     333              : 
     334           51 :         context.bms_replident = bms;
     335           51 :         rfnode = stringToNode(TextDatumGetCString(rfdatum));
     336           51 :         result = contain_invalid_rfcolumn_walker(rfnode, &context);
     337              :     }
     338              : 
     339          236 :     ReleaseSysCache(rftuple);
     340              : 
     341          236 :     return result;
     342              : }
     343              : 
     344              : /*
     345              :  * Check for invalid columns in the publication table definition.
     346              :  *
     347              :  * This function evaluates two conditions:
     348              :  *
     349              :  * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
     350              :  *    by the column list. If any column is missing, *invalid_column_list is set
     351              :  *    to true.
     352              :  * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY
     353              :  *    are published, either by being explicitly named in the column list or, if
     354              :  *    no column list is specified, by setting the option
     355              :  *    publish_generated_columns to stored. If any unpublished
     356              :  *    generated column is found, *invalid_gen_col is set to true.
     357              :  *
     358              :  * Returns true if any of the above conditions are not met.
     359              :  */
     360              : bool
     361          471 : pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
     362              :                             bool pubviaroot, char pubgencols_type,
     363              :                             bool *invalid_column_list,
     364              :                             bool *invalid_gen_col)
     365              : {
     366          471 :     Oid         relid = RelationGetRelid(relation);
     367          471 :     Oid         publish_as_relid = RelationGetRelid(relation);
     368              :     Bitmapset  *idattrs;
     369          471 :     Bitmapset  *columns = NULL;
     370          471 :     TupleDesc   desc = RelationGetDescr(relation);
     371              :     Publication *pub;
     372              :     int         x;
     373              : 
     374          471 :     *invalid_column_list = false;
     375          471 :     *invalid_gen_col = false;
     376              : 
     377              :     /*
     378              :      * For a partition, if pubviaroot is true, find the topmost ancestor that
     379              :      * is published via this publication as we need to use its column list for
     380              :      * the changes.
     381              :      *
     382              :      * Note that even though the column list used is for an ancestor, the
     383              :      * REPLICA IDENTITY used will be for the actual child table.
     384              :      */
     385          471 :     if (pubviaroot && relation->rd_rel->relispartition)
     386              :     {
     387           80 :         publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
     388              : 
     389           80 :         if (!OidIsValid(publish_as_relid))
     390           18 :             publish_as_relid = relid;
     391              :     }
     392              : 
     393              :     /* Fetch the column list */
     394          471 :     pub = GetPublication(pubid);
     395          471 :     check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns);
     396              : 
     397          471 :     if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     398              :     {
     399              :         /* With REPLICA IDENTITY FULL, no column list is allowed. */
     400          106 :         *invalid_column_list = (columns != NULL);
     401              : 
     402              :         /*
     403              :          * As we don't allow a column list with REPLICA IDENTITY FULL, the
     404              :          * publish_generated_columns option must be set to stored if the table
     405              :          * has any stored generated columns.
     406              :          */
     407          106 :         if (pubgencols_type != PUBLISH_GENCOLS_STORED &&
     408          100 :             relation->rd_att->constr &&
     409           44 :             relation->rd_att->constr->has_generated_stored)
     410            3 :             *invalid_gen_col = true;
     411              : 
     412              :         /*
     413              :          * Virtual generated columns are currently not supported for logical
     414              :          * replication at all.
     415              :          */
     416          106 :         if (relation->rd_att->constr &&
     417           50 :             relation->rd_att->constr->has_generated_virtual)
     418            6 :             *invalid_gen_col = true;
     419              : 
     420          106 :         if (*invalid_gen_col && *invalid_column_list)
     421            0 :             return true;
     422              :     }
     423              : 
     424              :     /* Remember columns that are part of the REPLICA IDENTITY */
     425          471 :     idattrs = RelationGetIndexAttrBitmap(relation,
     426              :                                          INDEX_ATTR_BITMAP_IDENTITY_KEY);
     427              : 
     428              :     /*
     429              :      * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
     430              :      * (to handle system columns the usual way), while column list does not
     431              :      * use offset, so we can't do bms_is_subset(). Instead, we have to loop
     432              :      * over the idattrs and check all of them are in the list.
     433              :      */
     434          471 :     x = -1;
     435          799 :     while ((x = bms_next_member(idattrs, x)) >= 0)
     436              :     {
     437          331 :         AttrNumber  attnum = (x + FirstLowInvalidHeapAttributeNumber);
     438          331 :         Form_pg_attribute att = TupleDescAttr(desc, attnum - 1);
     439              : 
     440          331 :         if (columns == NULL)
     441              :         {
     442              :             /*
     443              :              * The publish_generated_columns option must be set to stored if
     444              :              * the REPLICA IDENTITY contains any stored generated column.
     445              :              */
     446          226 :             if (att->attgenerated == ATTRIBUTE_GENERATED_STORED && pubgencols_type != PUBLISH_GENCOLS_STORED)
     447              :             {
     448            3 :                 *invalid_gen_col = true;
     449            3 :                 break;
     450              :             }
     451              : 
     452              :             /*
     453              :              * The equivalent setting for virtual generated columns does not
     454              :              * exist yet.
     455              :              */
     456          223 :             if (att->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
     457              :             {
     458            0 :                 *invalid_gen_col = true;
     459            0 :                 break;
     460              :             }
     461              : 
     462              :             /* Skip validating the column list since it is not defined */
     463          223 :             continue;
     464              :         }
     465              : 
     466              :         /*
     467              :          * If pubviaroot is true, we are validating the column list of the
     468              :          * parent table, but the bitmap contains the replica identity
     469              :          * information of the child table. The parent/child attnums may not
     470              :          * match, so translate them to the parent - get the attname from the
     471              :          * child, and look it up in the parent.
     472              :          */
     473          105 :         if (pubviaroot)
     474              :         {
     475              :             /* attribute name in the child table */
     476           40 :             char       *colname = get_attname(relid, attnum, false);
     477              : 
     478              :             /*
     479              :              * Determine the attnum for the attribute name in parent (we are
     480              :              * using the column list defined on the parent).
     481              :              */
     482           40 :             attnum = get_attnum(publish_as_relid, colname);
     483              :         }
     484              : 
     485              :         /* replica identity column, not covered by the column list */
     486          105 :         *invalid_column_list |= !bms_is_member(attnum, columns);
     487              : 
     488          105 :         if (*invalid_column_list && *invalid_gen_col)
     489            0 :             break;
     490              :     }
     491              : 
     492          471 :     bms_free(columns);
     493          471 :     bms_free(idattrs);
     494              : 
     495          471 :     return *invalid_column_list || *invalid_gen_col;
     496              : }
     497              : 
     498              : /*
     499              :  * Invalidate entries in the RelationSyncCache for relations included in the
     500              :  * specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA.
     501              :  *
     502              :  * If 'puballtables' is true, invalidate all cache entries.
     503              :  */
     504              : void
     505           18 : InvalidatePubRelSyncCache(Oid pubid, bool puballtables)
     506              : {
     507           18 :     if (puballtables)
     508              :     {
     509            3 :         CacheInvalidateRelSyncAll();
     510              :     }
     511              :     else
     512              :     {
     513           15 :         List       *relids = NIL;
     514           15 :         List       *schemarelids = NIL;
     515              : 
     516              :         /*
     517              :          * For partitioned tables, we must invalidate all partitions and
     518              :          * itself. WAL records for INSERT/UPDATE/DELETE specify leaf tables as
     519              :          * a target. However, WAL records for TRUNCATE specify both a root and
     520              :          * its leaves.
     521              :          */
     522           15 :         relids = GetPublicationRelations(pubid,
     523              :                                          PUBLICATION_PART_ALL);
     524           15 :         schemarelids = GetAllSchemaPublicationRelations(pubid,
     525              :                                                         PUBLICATION_PART_ALL);
     526              : 
     527           15 :         relids = list_concat_unique_oid(relids, schemarelids);
     528              : 
     529              :         /* Invalidate the relsyncache */
     530           33 :         foreach_oid(relid, relids)
     531            3 :             CacheInvalidateRelSync(relid);
     532              :     }
     533              : 
     534           18 :     return;
     535              : }
     536              : 
     537              : /* check_functions_in_node callback */
     538              : static bool
     539          218 : contain_mutable_or_user_functions_checker(Oid func_id, void *context)
     540              : {
     541          218 :     return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
     542              :             func_id >= FirstNormalObjectId);
     543              : }
     544              : 
     545              : /*
     546              :  * The row filter walker checks if the row filter expression is a "simple
     547              :  * expression".
     548              :  *
     549              :  * It allows only simple or compound expressions such as:
     550              :  * - (Var Op Const)
     551              :  * - (Var Op Var)
     552              :  * - (Var Op Const) AND/OR (Var Op Const)
     553              :  * - etc
     554              :  * (where Var is a column of the table this filter belongs to)
     555              :  *
     556              :  * The simple expression has the following restrictions:
     557              :  * - User-defined operators are not allowed;
     558              :  * - User-defined functions are not allowed;
     559              :  * - User-defined types are not allowed;
     560              :  * - User-defined collations are not allowed;
     561              :  * - Non-immutable built-in functions are not allowed;
     562              :  * - System columns are not allowed.
     563              :  *
     564              :  * NOTES
     565              :  *
     566              :  * We don't allow user-defined functions/operators/types/collations because
     567              :  * (a) if a user drops a user-defined object used in a row filter expression or
     568              :  * if there is any other error while using it, the logical decoding
     569              :  * infrastructure won't be able to recover from such an error even if the
     570              :  * object is recreated again because a historic snapshot is used to evaluate
     571              :  * the row filter;
     572              :  * (b) a user-defined function can be used to access tables that could have
     573              :  * unpleasant results because a historic snapshot is used. That's why only
     574              :  * immutable built-in functions are allowed in row filter expressions.
     575              :  *
     576              :  * We don't allow system columns because currently, we don't have that
     577              :  * information in the tuple passed to downstream. Also, as we don't replicate
     578              :  * those to subscribers, there doesn't seem to be a need for a filter on those
     579              :  * columns.
     580              :  *
     581              :  * We can allow other node types after more analysis and testing.
     582              :  */
     583              : static bool
     584          720 : check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
     585              : {
     586          720 :     char       *errdetail_msg = NULL;
     587              : 
     588          720 :     if (node == NULL)
     589            3 :         return false;
     590              : 
     591          717 :     switch (nodeTag(node))
     592              :     {
     593          194 :         case T_Var:
     594              :             /* System columns are not allowed. */
     595          194 :             if (((Var *) node)->varattno < InvalidAttrNumber)
     596            3 :                 errdetail_msg = _("System columns are not allowed.");
     597          194 :             break;
     598          196 :         case T_OpExpr:
     599              :         case T_DistinctExpr:
     600              :         case T_NullIfExpr:
     601              :             /* OK, except user-defined operators are not allowed. */
     602          196 :             if (((OpExpr *) node)->opno >= FirstNormalObjectId)
     603            3 :                 errdetail_msg = _("User-defined operators are not allowed.");
     604          196 :             break;
     605            3 :         case T_ScalarArrayOpExpr:
     606              :             /* OK, except user-defined operators are not allowed. */
     607            3 :             if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
     608            0 :                 errdetail_msg = _("User-defined operators are not allowed.");
     609              : 
     610              :             /*
     611              :              * We don't need to check the hashfuncid and negfuncid of
     612              :              * ScalarArrayOpExpr as those functions are only built for a
     613              :              * subquery.
     614              :              */
     615            3 :             break;
     616            3 :         case T_RowCompareExpr:
     617              :             {
     618              :                 ListCell   *opid;
     619              : 
     620              :                 /* OK, except user-defined operators are not allowed. */
     621            9 :                 foreach(opid, ((RowCompareExpr *) node)->opnos)
     622              :                 {
     623            6 :                     if (lfirst_oid(opid) >= FirstNormalObjectId)
     624              :                     {
     625            0 :                         errdetail_msg = _("User-defined operators are not allowed.");
     626            0 :                         break;
     627              :                     }
     628              :                 }
     629              :             }
     630            3 :             break;
     631          318 :         case T_Const:
     632              :         case T_FuncExpr:
     633              :         case T_BoolExpr:
     634              :         case T_RelabelType:
     635              :         case T_CollateExpr:
     636              :         case T_CaseExpr:
     637              :         case T_CaseTestExpr:
     638              :         case T_ArrayExpr:
     639              :         case T_RowExpr:
     640              :         case T_CoalesceExpr:
     641              :         case T_MinMaxExpr:
     642              :         case T_XmlExpr:
     643              :         case T_NullTest:
     644              :         case T_BooleanTest:
     645              :         case T_List:
     646              :             /* OK, supported */
     647          318 :             break;
     648            3 :         default:
     649            3 :             errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
     650            3 :             break;
     651              :     }
     652              : 
     653              :     /*
     654              :      * For all the supported nodes, if we haven't already found a problem,
     655              :      * check the types, functions, and collations used in it.  We check List
     656              :      * by walking through each element.
     657              :      */
     658          717 :     if (!errdetail_msg && !IsA(node, List))
     659              :     {
     660          681 :         if (exprType(node) >= FirstNormalObjectId)
     661            3 :             errdetail_msg = _("User-defined types are not allowed.");
     662          678 :         else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
     663              :                                          pstate))
     664            6 :             errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
     665         1344 :         else if (exprCollation(node) >= FirstNormalObjectId ||
     666          672 :                  exprInputCollation(node) >= FirstNormalObjectId)
     667            3 :             errdetail_msg = _("User-defined collations are not allowed.");
     668              :     }
     669              : 
     670              :     /*
     671              :      * If we found a problem in this node, throw error now. Otherwise keep
     672              :      * going.
     673              :      */
     674          717 :     if (errdetail_msg)
     675           21 :         ereport(ERROR,
     676              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     677              :                  errmsg("invalid publication WHERE expression"),
     678              :                  errdetail_internal("%s", errdetail_msg),
     679              :                  parser_errposition(pstate, exprLocation(node))));
     680              : 
     681          696 :     return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
     682              :                                   pstate);
     683              : }
     684              : 
     685              : /*
     686              :  * Check if the row filter expression is a "simple expression".
     687              :  *
     688              :  * See check_simple_rowfilter_expr_walker for details.
     689              :  */
     690              : static bool
     691          188 : check_simple_rowfilter_expr(Node *node, ParseState *pstate)
     692              : {
     693          188 :     return check_simple_rowfilter_expr_walker(node, pstate);
     694              : }
     695              : 
     696              : /*
     697              :  * Transform the publication WHERE expression for all the relations in the list,
     698              :  * ensuring it is coerced to boolean and necessary collation information is
     699              :  * added if required, and add a new nsitem/RTE for the associated relation to
     700              :  * the ParseState's namespace list.
     701              :  *
     702              :  * Also check the publication row filter expression and throw an error if
     703              :  * anything not permitted or unexpected is encountered.
     704              :  */
     705              : static void
     706          593 : TransformPubWhereClauses(List *tables, const char *queryString,
     707              :                          bool pubviaroot)
     708              : {
     709              :     ListCell   *lc;
     710              : 
     711         1194 :     foreach(lc, tables)
     712              :     {
     713              :         ParseNamespaceItem *nsitem;
     714          631 :         Node       *whereclause = NULL;
     715              :         ParseState *pstate;
     716          631 :         PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
     717              : 
     718          631 :         if (pri->whereClause == NULL)
     719          434 :             continue;
     720              : 
     721              :         /*
     722              :          * If the publication doesn't publish changes via the root partitioned
     723              :          * table, the partition's row filter will be used. So disallow using
     724              :          * WHERE clause on partitioned table in this case.
     725              :          */
     726          197 :         if (!pubviaroot &&
     727          182 :             pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     728            3 :             ereport(ERROR,
     729              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     730              :                      errmsg("cannot use publication WHERE clause for relation \"%s\"",
     731              :                             RelationGetRelationName(pri->relation)),
     732              :                      errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
     733              :                                "publish_via_partition_root")));
     734              : 
     735              :         /*
     736              :          * A fresh pstate is required so that we only have "this" table in its
     737              :          * rangetable
     738              :          */
     739          194 :         pstate = make_parsestate(NULL);
     740          194 :         pstate->p_sourcetext = queryString;
     741          194 :         nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
     742              :                                                AccessShareLock, NULL,
     743              :                                                false, false);
     744          194 :         addNSItemToQuery(pstate, nsitem, false, true, true);
     745              : 
     746          194 :         whereclause = transformWhereClause(pstate,
     747          194 :                                            copyObject(pri->whereClause),
     748              :                                            EXPR_KIND_WHERE,
     749              :                                            "PUBLICATION WHERE");
     750              : 
     751              :         /* Fix up collation information */
     752          188 :         assign_expr_collations(pstate, whereclause);
     753              : 
     754          188 :         whereclause = expand_generated_columns_in_expr(whereclause, pri->relation, 1);
     755              : 
     756              :         /*
     757              :          * We allow only simple expressions in row filters. See
     758              :          * check_simple_rowfilter_expr_walker.
     759              :          */
     760          188 :         check_simple_rowfilter_expr(whereclause, pstate);
     761              : 
     762          167 :         free_parsestate(pstate);
     763              : 
     764          167 :         pri->whereClause = whereclause;
     765              :     }
     766          563 : }
     767              : 
     768              : 
     769              : /*
     770              :  * Given a list of tables that are going to be added to a publication,
     771              :  * verify that they fulfill the necessary preconditions, namely: no tables
     772              :  * have a column list if any schema is published; and partitioned tables do
     773              :  * not have column lists if publish_via_partition_root is not set.
     774              :  *
     775              :  * 'publish_schema' indicates that the publication contains any TABLES IN
     776              :  * SCHEMA elements (newly added in this command, or preexisting).
     777              :  * 'pubviaroot' is the value of publish_via_partition_root.
     778              :  */
     779              : static void
     780          563 : CheckPubRelationColumnList(char *pubname, List *tables,
     781              :                            bool publish_schema, bool pubviaroot)
     782              : {
     783              :     ListCell   *lc;
     784              : 
     785         1149 :     foreach(lc, tables)
     786              :     {
     787          601 :         PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
     788              : 
     789          601 :         if (pri->columns == NIL)
     790          399 :             continue;
     791              : 
     792              :         /*
     793              :          * Disallow specifying column list if any schema is in the
     794              :          * publication.
     795              :          *
     796              :          * XXX We could instead just forbid the case when the publication
     797              :          * tries to publish the table with a column list and a schema for that
     798              :          * table. However, if we do that then we need a restriction during
     799              :          * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
     800              :          * seem to be a good idea.
     801              :          */
     802          202 :         if (publish_schema)
     803           12 :             ereport(ERROR,
     804              :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     805              :                     errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
     806              :                            get_namespace_name(RelationGetNamespace(pri->relation)),
     807              :                            RelationGetRelationName(pri->relation), pubname),
     808              :                     errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
     809              : 
     810              :         /*
     811              :          * If the publication doesn't publish changes via the root partitioned
     812              :          * table, the partition's column list will be used. So disallow using
     813              :          * a column list on the partitioned table in this case.
     814              :          */
     815          190 :         if (!pubviaroot &&
     816          152 :             pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     817            3 :             ereport(ERROR,
     818              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     819              :                      errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
     820              :                             get_namespace_name(RelationGetNamespace(pri->relation)),
     821              :                             RelationGetRelationName(pri->relation), pubname),
     822              :                      errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
     823              :                                "publish_via_partition_root")));
     824              :     }
     825          548 : }
     826              : 
     827              : /*
     828              :  * Create new publication.
     829              :  */
     830              : ObjectAddress
     831          448 : CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
     832              : {
     833              :     Relation    rel;
     834              :     ObjectAddress myself;
     835              :     Oid         puboid;
     836              :     bool        nulls[Natts_pg_publication];
     837              :     Datum       values[Natts_pg_publication];
     838              :     HeapTuple   tup;
     839              :     bool        publish_given;
     840              :     PublicationActions pubactions;
     841              :     bool        publish_via_partition_root_given;
     842              :     bool        publish_via_partition_root;
     843              :     bool        publish_generated_columns_given;
     844              :     char        publish_generated_columns;
     845              :     AclResult   aclresult;
     846          448 :     List       *relations = NIL;
     847          448 :     List       *schemaidlist = NIL;
     848              : 
     849              :     /* must have CREATE privilege on database */
     850          448 :     aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
     851          448 :     if (aclresult != ACLCHECK_OK)
     852            3 :         aclcheck_error(aclresult, OBJECT_DATABASE,
     853            3 :                        get_database_name(MyDatabaseId));
     854              : 
     855              :     /* FOR ALL TABLES and FOR ALL SEQUENCES requires superuser */
     856          445 :     if (!superuser())
     857              :     {
     858           12 :         if (stmt->for_all_tables || stmt->for_all_sequences)
     859            0 :             ereport(ERROR,
     860              :                     errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     861              :                     errmsg("must be superuser to create a FOR ALL TABLES or ALL SEQUENCES publication"));
     862              :     }
     863              : 
     864          445 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
     865              : 
     866              :     /* Check if name is used */
     867          445 :     puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
     868              :                              CStringGetDatum(stmt->pubname));
     869          445 :     if (OidIsValid(puboid))
     870            3 :         ereport(ERROR,
     871              :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     872              :                  errmsg("publication \"%s\" already exists",
     873              :                         stmt->pubname)));
     874              : 
     875              :     /* Form a tuple. */
     876          442 :     memset(values, 0, sizeof(values));
     877          442 :     memset(nulls, false, sizeof(nulls));
     878              : 
     879          442 :     values[Anum_pg_publication_pubname - 1] =
     880          442 :         DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
     881          442 :     values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
     882              : 
     883          442 :     parse_publication_options(pstate,
     884              :                               stmt->options,
     885              :                               &publish_given, &pubactions,
     886              :                               &publish_via_partition_root_given,
     887              :                               &publish_via_partition_root,
     888              :                               &publish_generated_columns_given,
     889              :                               &publish_generated_columns);
     890              : 
     891          424 :     if (stmt->for_all_sequences &&
     892           14 :         (publish_given || publish_via_partition_root_given ||
     893              :          publish_generated_columns_given))
     894            1 :         ereport(NOTICE,
     895              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     896              :                 errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
     897              : 
     898          424 :     puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
     899              :                                 Anum_pg_publication_oid);
     900          424 :     values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
     901          424 :     values[Anum_pg_publication_puballtables - 1] =
     902          424 :         BoolGetDatum(stmt->for_all_tables);
     903          424 :     values[Anum_pg_publication_puballsequences - 1] =
     904          424 :         BoolGetDatum(stmt->for_all_sequences);
     905          424 :     values[Anum_pg_publication_pubinsert - 1] =
     906          424 :         BoolGetDatum(pubactions.pubinsert);
     907          424 :     values[Anum_pg_publication_pubupdate - 1] =
     908          424 :         BoolGetDatum(pubactions.pubupdate);
     909          424 :     values[Anum_pg_publication_pubdelete - 1] =
     910          424 :         BoolGetDatum(pubactions.pubdelete);
     911          424 :     values[Anum_pg_publication_pubtruncate - 1] =
     912          424 :         BoolGetDatum(pubactions.pubtruncate);
     913          424 :     values[Anum_pg_publication_pubviaroot - 1] =
     914          424 :         BoolGetDatum(publish_via_partition_root);
     915          424 :     values[Anum_pg_publication_pubgencols - 1] =
     916          424 :         CharGetDatum(publish_generated_columns);
     917              : 
     918          424 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     919              : 
     920              :     /* Insert tuple into catalog. */
     921          424 :     CatalogTupleInsert(rel, tup);
     922          424 :     heap_freetuple(tup);
     923              : 
     924          424 :     recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
     925              : 
     926          424 :     ObjectAddressSet(myself, PublicationRelationId, puboid);
     927              : 
     928              :     /* Make the changes visible. */
     929          424 :     CommandCounterIncrement();
     930              : 
     931              :     /* Associate objects with the publication. */
     932          424 :     if (stmt->for_all_tables)
     933              :     {
     934              :         /*
     935              :          * Invalidate relcache so that publication info is rebuilt. Sequences
     936              :          * publication doesn't require invalidation, as replica identity
     937              :          * checks don't apply to them.
     938              :          */
     939           53 :         CacheInvalidateRelcacheAll();
     940              :     }
     941          371 :     else if (!stmt->for_all_sequences)
     942              :     {
     943          361 :         ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
     944              :                                    &schemaidlist);
     945              : 
     946              :         /* FOR TABLES IN SCHEMA requires superuser */
     947          352 :         if (schemaidlist != NIL && !superuser())
     948            3 :             ereport(ERROR,
     949              :                     errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     950              :                     errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
     951              : 
     952          349 :         if (relations != NIL)
     953              :         {
     954              :             List       *rels;
     955              : 
     956          237 :             rels = OpenTableList(relations);
     957          222 :             TransformPubWhereClauses(rels, pstate->p_sourcetext,
     958              :                                      publish_via_partition_root);
     959              : 
     960          210 :             CheckPubRelationColumnList(stmt->pubname, rels,
     961              :                                        schemaidlist != NIL,
     962              :                                        publish_via_partition_root);
     963              : 
     964          207 :             PublicationAddTables(puboid, rels, true, NULL);
     965          194 :             CloseTableList(rels);
     966              :         }
     967              : 
     968          306 :         if (schemaidlist != NIL)
     969              :         {
     970              :             /*
     971              :              * Schema lock is held until the publication is created to prevent
     972              :              * concurrent schema deletion.
     973              :              */
     974           76 :             LockSchemaList(schemaidlist);
     975           76 :             PublicationAddSchemas(puboid, schemaidlist, true, NULL);
     976              :         }
     977              :     }
     978              : 
     979          366 :     table_close(rel, RowExclusiveLock);
     980              : 
     981          366 :     InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
     982              : 
     983              :     /*
     984              :      * We don't need this warning message when wal_level >= 'replica' since
     985              :      * logical decoding is automatically enabled up on a logical slot
     986              :      * creation.
     987              :      */
     988          366 :     if (wal_level < WAL_LEVEL_REPLICA)
     989           13 :         ereport(WARNING,
     990              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     991              :                  errmsg("logical decoding must be enabled to publish logical changes"),
     992              :                  errhint("Before creating subscriptions, ensure that \"wal_level\" is set to \"replica\" or higher.")));
     993              : 
     994          366 :     return myself;
     995              : }
     996              : 
     997              : /*
     998              :  * Change options of a publication.
     999              :  */
    1000              : static void
    1001           64 : AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
    1002              :                         Relation rel, HeapTuple tup)
    1003              : {
    1004              :     bool        nulls[Natts_pg_publication];
    1005              :     bool        replaces[Natts_pg_publication];
    1006              :     Datum       values[Natts_pg_publication];
    1007              :     bool        publish_given;
    1008              :     PublicationActions pubactions;
    1009              :     bool        publish_via_partition_root_given;
    1010              :     bool        publish_via_partition_root;
    1011              :     bool        publish_generated_columns_given;
    1012              :     char        publish_generated_columns;
    1013              :     ObjectAddress obj;
    1014              :     Form_pg_publication pubform;
    1015           64 :     List       *root_relids = NIL;
    1016              :     ListCell   *lc;
    1017              : 
    1018           64 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1019              : 
    1020           64 :     parse_publication_options(pstate,
    1021              :                               stmt->options,
    1022              :                               &publish_given, &pubactions,
    1023              :                               &publish_via_partition_root_given,
    1024              :                               &publish_via_partition_root,
    1025              :                               &publish_generated_columns_given,
    1026              :                               &publish_generated_columns);
    1027              : 
    1028           64 :     if (pubform->puballsequences &&
    1029            6 :         (publish_given || publish_via_partition_root_given ||
    1030              :          publish_generated_columns_given))
    1031            6 :         ereport(NOTICE,
    1032              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1033              :                 errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
    1034              : 
    1035              :     /*
    1036              :      * If the publication doesn't publish changes via the root partitioned
    1037              :      * table, the partition's row filter and column list will be used. So
    1038              :      * disallow using WHERE clause and column lists on partitioned table in
    1039              :      * this case.
    1040              :      */
    1041           64 :     if (!pubform->puballtables && publish_via_partition_root_given &&
    1042           40 :         !publish_via_partition_root)
    1043              :     {
    1044              :         /*
    1045              :          * Lock the publication so nobody else can do anything with it. This
    1046              :          * prevents concurrent alter to add partitioned table(s) with WHERE
    1047              :          * clause(s) and/or column lists which we don't allow when not
    1048              :          * publishing via root.
    1049              :          */
    1050           24 :         LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
    1051              :                            AccessShareLock);
    1052              : 
    1053           24 :         root_relids = GetPublicationRelations(pubform->oid,
    1054              :                                               PUBLICATION_PART_ROOT);
    1055              : 
    1056           42 :         foreach(lc, root_relids)
    1057              :         {
    1058           24 :             Oid         relid = lfirst_oid(lc);
    1059              :             HeapTuple   rftuple;
    1060              :             char        relkind;
    1061              :             char       *relname;
    1062              :             bool        has_rowfilter;
    1063              :             bool        has_collist;
    1064              : 
    1065              :             /*
    1066              :              * Beware: we don't have lock on the relations, so cope silently
    1067              :              * with the cache lookups returning NULL.
    1068              :              */
    1069              : 
    1070           24 :             rftuple = SearchSysCache2(PUBLICATIONRELMAP,
    1071              :                                       ObjectIdGetDatum(relid),
    1072              :                                       ObjectIdGetDatum(pubform->oid));
    1073           24 :             if (!HeapTupleIsValid(rftuple))
    1074            0 :                 continue;
    1075           24 :             has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
    1076           24 :             has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
    1077           24 :             if (!has_rowfilter && !has_collist)
    1078              :             {
    1079            6 :                 ReleaseSysCache(rftuple);
    1080            6 :                 continue;
    1081              :             }
    1082              : 
    1083           18 :             relkind = get_rel_relkind(relid);
    1084           18 :             if (relkind != RELKIND_PARTITIONED_TABLE)
    1085              :             {
    1086           12 :                 ReleaseSysCache(rftuple);
    1087           12 :                 continue;
    1088              :             }
    1089            6 :             relname = get_rel_name(relid);
    1090            6 :             if (relname == NULL)    /* table concurrently dropped */
    1091              :             {
    1092            0 :                 ReleaseSysCache(rftuple);
    1093            0 :                 continue;
    1094              :             }
    1095              : 
    1096            6 :             if (has_rowfilter)
    1097            3 :                 ereport(ERROR,
    1098              :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1099              :                          errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
    1100              :                                 "publish_via_partition_root",
    1101              :                                 stmt->pubname),
    1102              :                          errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
    1103              :                                    relname, "publish_via_partition_root")));
    1104              :             Assert(has_collist);
    1105            3 :             ereport(ERROR,
    1106              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1107              :                      errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
    1108              :                             "publish_via_partition_root",
    1109              :                             stmt->pubname),
    1110              :                      errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
    1111              :                                relname, "publish_via_partition_root")));
    1112              :         }
    1113              :     }
    1114              : 
    1115              :     /* Everything ok, form a new tuple. */
    1116           58 :     memset(values, 0, sizeof(values));
    1117           58 :     memset(nulls, false, sizeof(nulls));
    1118           58 :     memset(replaces, false, sizeof(replaces));
    1119              : 
    1120           58 :     if (publish_given)
    1121              :     {
    1122           17 :         values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
    1123           17 :         replaces[Anum_pg_publication_pubinsert - 1] = true;
    1124              : 
    1125           17 :         values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
    1126           17 :         replaces[Anum_pg_publication_pubupdate - 1] = true;
    1127              : 
    1128           17 :         values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
    1129           17 :         replaces[Anum_pg_publication_pubdelete - 1] = true;
    1130              : 
    1131           17 :         values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
    1132           17 :         replaces[Anum_pg_publication_pubtruncate - 1] = true;
    1133              :     }
    1134              : 
    1135           58 :     if (publish_via_partition_root_given)
    1136              :     {
    1137           35 :         values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
    1138           35 :         replaces[Anum_pg_publication_pubviaroot - 1] = true;
    1139              :     }
    1140              : 
    1141           58 :     if (publish_generated_columns_given)
    1142              :     {
    1143            6 :         values[Anum_pg_publication_pubgencols - 1] = CharGetDatum(publish_generated_columns);
    1144            6 :         replaces[Anum_pg_publication_pubgencols - 1] = true;
    1145              :     }
    1146              : 
    1147           58 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
    1148              :                             replaces);
    1149              : 
    1150              :     /* Update the catalog. */
    1151           58 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    1152              : 
    1153           58 :     CommandCounterIncrement();
    1154              : 
    1155           58 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1156              : 
    1157              :     /* Invalidate the relcache. */
    1158           58 :     if (pubform->puballtables)
    1159              :     {
    1160           10 :         CacheInvalidateRelcacheAll();
    1161              :     }
    1162              :     else
    1163              :     {
    1164           48 :         List       *relids = NIL;
    1165           48 :         List       *schemarelids = NIL;
    1166              : 
    1167              :         /*
    1168              :          * For any partitioned tables contained in the publication, we must
    1169              :          * invalidate all partitions contained in the respective partition
    1170              :          * trees, not just those explicitly mentioned in the publication.
    1171              :          */
    1172           48 :         if (root_relids == NIL)
    1173           30 :             relids = GetPublicationRelations(pubform->oid,
    1174              :                                              PUBLICATION_PART_ALL);
    1175              :         else
    1176              :         {
    1177              :             /*
    1178              :              * We already got tables explicitly mentioned in the publication.
    1179              :              * Now get all partitions for the partitioned table in the list.
    1180              :              */
    1181           36 :             foreach(lc, root_relids)
    1182           18 :                 relids = GetPubPartitionOptionRelations(relids,
    1183              :                                                         PUBLICATION_PART_ALL,
    1184              :                                                         lfirst_oid(lc));
    1185              :         }
    1186              : 
    1187           48 :         schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
    1188              :                                                         PUBLICATION_PART_ALL);
    1189           48 :         relids = list_concat_unique_oid(relids, schemarelids);
    1190              : 
    1191           48 :         InvalidatePublicationRels(relids);
    1192              :     }
    1193              : 
    1194           58 :     ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
    1195           58 :     EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
    1196              :                                      (Node *) stmt);
    1197              : 
    1198           58 :     InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
    1199           58 : }
    1200              : 
    1201              : /*
    1202              :  * Invalidate the relations.
    1203              :  */
    1204              : void
    1205         1225 : InvalidatePublicationRels(List *relids)
    1206              : {
    1207              :     /*
    1208              :      * We don't want to send too many individual messages, at some point it's
    1209              :      * cheaper to just reset whole relcache.
    1210              :      */
    1211         1225 :     if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
    1212              :     {
    1213              :         ListCell   *lc;
    1214              : 
    1215        10337 :         foreach(lc, relids)
    1216         9112 :             CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
    1217              :     }
    1218              :     else
    1219            0 :         CacheInvalidateRelcacheAll();
    1220         1225 : }
    1221              : 
    1222              : /*
    1223              :  * Add or remove table to/from publication.
    1224              :  */
    1225              : static void
    1226          459 : AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
    1227              :                        List *tables, const char *queryString,
    1228              :                        bool publish_schema)
    1229              : {
    1230          459 :     List       *rels = NIL;
    1231          459 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
    1232          459 :     Oid         pubid = pubform->oid;
    1233              : 
    1234              :     /*
    1235              :      * Nothing to do if no objects, except in SET: for that it is quite
    1236              :      * possible that user has not specified any tables in which case we need
    1237              :      * to remove all the existing tables.
    1238              :      */
    1239          459 :     if (!tables && stmt->action != AP_SetObjects)
    1240           38 :         return;
    1241              : 
    1242          421 :     rels = OpenTableList(tables);
    1243              : 
    1244          421 :     if (stmt->action == AP_AddObjects)
    1245              :     {
    1246          148 :         TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
    1247              : 
    1248          139 :         publish_schema |= is_schema_publication(pubid);
    1249              : 
    1250          139 :         CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
    1251          139 :                                    pubform->pubviaroot);
    1252              : 
    1253          133 :         PublicationAddTables(pubid, rels, false, stmt);
    1254              :     }
    1255          273 :     else if (stmt->action == AP_DropObjects)
    1256           50 :         PublicationDropTables(pubid, rels, false);
    1257              :     else                        /* AP_SetObjects */
    1258              :     {
    1259          223 :         List       *oldrelids = GetPublicationRelations(pubid,
    1260              :                                                         PUBLICATION_PART_ROOT);
    1261          223 :         List       *delrels = NIL;
    1262              :         ListCell   *oldlc;
    1263              : 
    1264          223 :         TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
    1265              : 
    1266          214 :         CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
    1267          214 :                                    pubform->pubviaroot);
    1268              : 
    1269              :         /*
    1270              :          * To recreate the relation list for the publication, look for
    1271              :          * existing relations that do not need to be dropped.
    1272              :          */
    1273          403 :         foreach(oldlc, oldrelids)
    1274              :         {
    1275          201 :             Oid         oldrelid = lfirst_oid(oldlc);
    1276              :             ListCell   *newlc;
    1277              :             PublicationRelInfo *oldrel;
    1278          201 :             bool        found = false;
    1279              :             HeapTuple   rftuple;
    1280          201 :             Node       *oldrelwhereclause = NULL;
    1281          201 :             Bitmapset  *oldcolumns = NULL;
    1282              : 
    1283              :             /* look up the cache for the old relmap */
    1284          201 :             rftuple = SearchSysCache2(PUBLICATIONRELMAP,
    1285              :                                       ObjectIdGetDatum(oldrelid),
    1286              :                                       ObjectIdGetDatum(pubid));
    1287              : 
    1288              :             /*
    1289              :              * See if the existing relation currently has a WHERE clause or a
    1290              :              * column list. We need to compare those too.
    1291              :              */
    1292          201 :             if (HeapTupleIsValid(rftuple))
    1293              :             {
    1294          201 :                 bool        isnull = true;
    1295              :                 Datum       whereClauseDatum;
    1296              :                 Datum       columnListDatum;
    1297              : 
    1298              :                 /* Load the WHERE clause for this table. */
    1299          201 :                 whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
    1300              :                                                    Anum_pg_publication_rel_prqual,
    1301              :                                                    &isnull);
    1302          201 :                 if (!isnull)
    1303          105 :                     oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
    1304              : 
    1305              :                 /* Transform the int2vector column list to a bitmap. */
    1306          201 :                 columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
    1307              :                                                   Anum_pg_publication_rel_prattrs,
    1308              :                                                   &isnull);
    1309              : 
    1310          201 :                 if (!isnull)
    1311           68 :                     oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
    1312              : 
    1313          201 :                 ReleaseSysCache(rftuple);
    1314              :             }
    1315              : 
    1316          389 :             foreach(newlc, rels)
    1317              :             {
    1318              :                 PublicationRelInfo *newpubrel;
    1319              :                 Oid         newrelid;
    1320          202 :                 Bitmapset  *newcolumns = NULL;
    1321              : 
    1322          202 :                 newpubrel = (PublicationRelInfo *) lfirst(newlc);
    1323          202 :                 newrelid = RelationGetRelid(newpubrel->relation);
    1324              : 
    1325              :                 /*
    1326              :                  * Validate the column list.  If the column list or WHERE
    1327              :                  * clause changes, then the validation done here will be
    1328              :                  * duplicated inside PublicationAddTables().  The validation
    1329              :                  * is cheap enough that that seems harmless.
    1330              :                  */
    1331          202 :                 newcolumns = pub_collist_validate(newpubrel->relation,
    1332              :                                                   newpubrel->columns);
    1333              : 
    1334              :                 /*
    1335              :                  * Check if any of the new set of relations matches with the
    1336              :                  * existing relations in the publication. Additionally, if the
    1337              :                  * relation has an associated WHERE clause, check the WHERE
    1338              :                  * expressions also match. Same for the column list. Drop the
    1339              :                  * rest.
    1340              :                  */
    1341          196 :                 if (newrelid == oldrelid)
    1342              :                 {
    1343          142 :                     if (equal(oldrelwhereclause, newpubrel->whereClause) &&
    1344           40 :                         bms_equal(oldcolumns, newcolumns))
    1345              :                     {
    1346            8 :                         found = true;
    1347            8 :                         break;
    1348              :                     }
    1349              :                 }
    1350              :             }
    1351              : 
    1352              :             /*
    1353              :              * Add the non-matched relations to a list so that they can be
    1354              :              * dropped.
    1355              :              */
    1356          195 :             if (!found)
    1357              :             {
    1358          187 :                 oldrel = palloc_object(PublicationRelInfo);
    1359          187 :                 oldrel->whereClause = NULL;
    1360          187 :                 oldrel->columns = NIL;
    1361          187 :                 oldrel->relation = table_open(oldrelid,
    1362              :                                               ShareUpdateExclusiveLock);
    1363          187 :                 delrels = lappend(delrels, oldrel);
    1364              :             }
    1365              :         }
    1366              : 
    1367              :         /* And drop them. */
    1368          202 :         PublicationDropTables(pubid, delrels, true);
    1369              : 
    1370              :         /*
    1371              :          * Don't bother calculating the difference for adding, we'll catch and
    1372              :          * skip existing ones when doing catalog update.
    1373              :          */
    1374          202 :         PublicationAddTables(pubid, rels, true, stmt);
    1375              : 
    1376          202 :         CloseTableList(delrels);
    1377              :     }
    1378              : 
    1379          355 :     CloseTableList(rels);
    1380              : }
    1381              : 
    1382              : /*
    1383              :  * Alter the publication schemas.
    1384              :  *
    1385              :  * Add or remove schemas to/from publication.
    1386              :  */
    1387              : static void
    1388          393 : AlterPublicationSchemas(AlterPublicationStmt *stmt,
    1389              :                         HeapTuple tup, List *schemaidlist)
    1390              : {
    1391          393 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
    1392              : 
    1393              :     /*
    1394              :      * Nothing to do if no objects, except in SET: for that it is quite
    1395              :      * possible that user has not specified any schemas in which case we need
    1396              :      * to remove all the existing schemas.
    1397              :      */
    1398          393 :     if (!schemaidlist && stmt->action != AP_SetObjects)
    1399          153 :         return;
    1400              : 
    1401              :     /*
    1402              :      * Schema lock is held until the publication is altered to prevent
    1403              :      * concurrent schema deletion.
    1404              :      */
    1405          240 :     LockSchemaList(schemaidlist);
    1406          240 :     if (stmt->action == AP_AddObjects)
    1407              :     {
    1408              :         ListCell   *lc;
    1409              :         List       *reloids;
    1410              : 
    1411           19 :         reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
    1412              : 
    1413           28 :         foreach(lc, reloids)
    1414              :         {
    1415              :             HeapTuple   coltuple;
    1416              : 
    1417           12 :             coltuple = SearchSysCache2(PUBLICATIONRELMAP,
    1418              :                                        ObjectIdGetDatum(lfirst_oid(lc)),
    1419              :                                        ObjectIdGetDatum(pubform->oid));
    1420              : 
    1421           12 :             if (!HeapTupleIsValid(coltuple))
    1422            0 :                 continue;
    1423              : 
    1424              :             /*
    1425              :              * Disallow adding schema if column list is already part of the
    1426              :              * publication. See CheckPubRelationColumnList.
    1427              :              */
    1428           12 :             if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
    1429            3 :                 ereport(ERROR,
    1430              :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1431              :                         errmsg("cannot add schema to publication \"%s\"",
    1432              :                                stmt->pubname),
    1433              :                         errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
    1434              : 
    1435            9 :             ReleaseSysCache(coltuple);
    1436              :         }
    1437              : 
    1438           16 :         PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
    1439              :     }
    1440          221 :     else if (stmt->action == AP_DropObjects)
    1441           19 :         PublicationDropSchemas(pubform->oid, schemaidlist, false);
    1442              :     else                        /* AP_SetObjects */
    1443              :     {
    1444          202 :         List       *oldschemaids = GetPublicationSchemas(pubform->oid);
    1445          202 :         List       *delschemas = NIL;
    1446              : 
    1447              :         /* Identify which schemas should be dropped */
    1448          202 :         delschemas = list_difference_oid(oldschemaids, schemaidlist);
    1449              : 
    1450              :         /*
    1451              :          * Schema lock is held until the publication is altered to prevent
    1452              :          * concurrent schema deletion.
    1453              :          */
    1454          202 :         LockSchemaList(delschemas);
    1455              : 
    1456              :         /* And drop them */
    1457          202 :         PublicationDropSchemas(pubform->oid, delschemas, true);
    1458              : 
    1459              :         /*
    1460              :          * Don't bother calculating the difference for adding, we'll catch and
    1461              :          * skip existing ones when doing catalog update.
    1462              :          */
    1463          202 :         PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
    1464              :     }
    1465              : }
    1466              : 
    1467              : /*
    1468              :  * Check if relations and schemas can be in a given publication and throw
    1469              :  * appropriate error if not.
    1470              :  */
    1471              : static void
    1472          480 : CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
    1473              :                       List *tables, List *schemaidlist)
    1474              : {
    1475          480 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
    1476              : 
    1477          480 :     if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
    1478           52 :         schemaidlist && !superuser())
    1479            3 :         ereport(ERROR,
    1480              :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1481              :                  errmsg("must be superuser to add or set schemas")));
    1482              : 
    1483              :     /*
    1484              :      * Check that user is allowed to manipulate the publication tables in
    1485              :      * schema
    1486              :      */
    1487          477 :     if (schemaidlist && (pubform->puballtables || pubform->puballsequences))
    1488              :     {
    1489            9 :         if (pubform->puballtables && pubform->puballsequences)
    1490            0 :             ereport(ERROR,
    1491              :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1492              :                     errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
    1493              :                            NameStr(pubform->pubname)),
    1494              :                     errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
    1495            9 :         else if (pubform->puballtables)
    1496            9 :             ereport(ERROR,
    1497              :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1498              :                     errmsg("publication \"%s\" is defined as FOR ALL TABLES",
    1499              :                            NameStr(pubform->pubname)),
    1500              :                     errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications."));
    1501              :         else
    1502            0 :             ereport(ERROR,
    1503              :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1504              :                     errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
    1505              :                            NameStr(pubform->pubname)),
    1506              :                     errdetail("Schemas cannot be added to or dropped from FOR ALL SEQUENCES publications."));
    1507              :     }
    1508              : 
    1509              :     /* Check that user is allowed to manipulate the publication tables. */
    1510          468 :     if (tables && (pubform->puballtables || pubform->puballsequences))
    1511              :     {
    1512            9 :         if (pubform->puballtables && pubform->puballsequences)
    1513            0 :             ereport(ERROR,
    1514              :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1515              :                     errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
    1516              :                            NameStr(pubform->pubname)),
    1517              :                     errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
    1518            9 :         else if (pubform->puballtables)
    1519            9 :             ereport(ERROR,
    1520              :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1521              :                     errmsg("publication \"%s\" is defined as FOR ALL TABLES",
    1522              :                            NameStr(pubform->pubname)),
    1523              :                     errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES publications."));
    1524              :         else
    1525            0 :             ereport(ERROR,
    1526              :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1527              :                     errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
    1528              :                            NameStr(pubform->pubname)),
    1529              :                     errdetail("Tables or sequences cannot be added to or dropped from FOR ALL SEQUENCES publications."));
    1530              :     }
    1531          459 : }
    1532              : 
    1533              : /*
    1534              :  * Alter the existing publication.
    1535              :  *
    1536              :  * This is dispatcher function for AlterPublicationOptions,
    1537              :  * AlterPublicationSchemas and AlterPublicationTables.
    1538              :  */
    1539              : void
    1540          553 : AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
    1541              : {
    1542              :     Relation    rel;
    1543              :     HeapTuple   tup;
    1544              :     Form_pg_publication pubform;
    1545              : 
    1546          553 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    1547              : 
    1548          553 :     tup = SearchSysCacheCopy1(PUBLICATIONNAME,
    1549              :                               CStringGetDatum(stmt->pubname));
    1550              : 
    1551          553 :     if (!HeapTupleIsValid(tup))
    1552            0 :         ereport(ERROR,
    1553              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    1554              :                  errmsg("publication \"%s\" does not exist",
    1555              :                         stmt->pubname)));
    1556              : 
    1557          553 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1558              : 
    1559              :     /* must be owner */
    1560          553 :     if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
    1561            0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
    1562            0 :                        stmt->pubname);
    1563              : 
    1564          553 :     if (stmt->options)
    1565           64 :         AlterPublicationOptions(pstate, stmt, rel, tup);
    1566              :     else
    1567              :     {
    1568          489 :         List       *relations = NIL;
    1569          489 :         List       *schemaidlist = NIL;
    1570          489 :         Oid         pubid = pubform->oid;
    1571              : 
    1572          489 :         ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
    1573              :                                    &schemaidlist);
    1574              : 
    1575          480 :         CheckAlterPublication(stmt, tup, relations, schemaidlist);
    1576              : 
    1577          459 :         heap_freetuple(tup);
    1578              : 
    1579              :         /* Lock the publication so nobody else can do anything with it. */
    1580          459 :         LockDatabaseObject(PublicationRelationId, pubid, 0,
    1581              :                            AccessExclusiveLock);
    1582              : 
    1583              :         /*
    1584              :          * It is possible that by the time we acquire the lock on publication,
    1585              :          * concurrent DDL has removed it. We can test this by checking the
    1586              :          * existence of publication. We get the tuple again to avoid the risk
    1587              :          * of any publication option getting changed.
    1588              :          */
    1589          459 :         tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    1590          459 :         if (!HeapTupleIsValid(tup))
    1591            0 :             ereport(ERROR,
    1592              :                     errcode(ERRCODE_UNDEFINED_OBJECT),
    1593              :                     errmsg("publication \"%s\" does not exist",
    1594              :                            stmt->pubname));
    1595              : 
    1596          459 :         AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
    1597              :                                schemaidlist != NIL);
    1598          393 :         AlterPublicationSchemas(stmt, tup, schemaidlist);
    1599              :     }
    1600              : 
    1601              :     /* Cleanup. */
    1602          442 :     heap_freetuple(tup);
    1603          442 :     table_close(rel, RowExclusiveLock);
    1604          442 : }
    1605              : 
    1606              : /*
    1607              :  * Remove relation from publication by mapping OID.
    1608              :  */
    1609              : void
    1610          424 : RemovePublicationRelById(Oid proid)
    1611              : {
    1612              :     Relation    rel;
    1613              :     HeapTuple   tup;
    1614              :     Form_pg_publication_rel pubrel;
    1615          424 :     List       *relids = NIL;
    1616              : 
    1617          424 :     rel = table_open(PublicationRelRelationId, RowExclusiveLock);
    1618              : 
    1619          424 :     tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
    1620              : 
    1621          424 :     if (!HeapTupleIsValid(tup))
    1622            0 :         elog(ERROR, "cache lookup failed for publication table %u",
    1623              :              proid);
    1624              : 
    1625          424 :     pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
    1626              : 
    1627              :     /*
    1628              :      * Invalidate relcache so that publication info is rebuilt.
    1629              :      *
    1630              :      * For the partitioned tables, we must invalidate all partitions contained
    1631              :      * in the respective partition hierarchies, not just the one explicitly
    1632              :      * mentioned in the publication. This is required because we implicitly
    1633              :      * publish the child tables when the parent table is published.
    1634              :      */
    1635          424 :     relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
    1636              :                                             pubrel->prrelid);
    1637              : 
    1638          424 :     InvalidatePublicationRels(relids);
    1639              : 
    1640          424 :     CatalogTupleDelete(rel, &tup->t_self);
    1641              : 
    1642          424 :     ReleaseSysCache(tup);
    1643              : 
    1644          424 :     table_close(rel, RowExclusiveLock);
    1645          424 : }
    1646              : 
    1647              : /*
    1648              :  * Remove the publication by mapping OID.
    1649              :  */
    1650              : void
    1651          250 : RemovePublicationById(Oid pubid)
    1652              : {
    1653              :     Relation    rel;
    1654              :     HeapTuple   tup;
    1655              :     Form_pg_publication pubform;
    1656              : 
    1657          250 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    1658              : 
    1659          250 :     tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    1660          250 :     if (!HeapTupleIsValid(tup))
    1661            0 :         elog(ERROR, "cache lookup failed for publication %u", pubid);
    1662              : 
    1663          250 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1664              : 
    1665              :     /* Invalidate relcache so that publication info is rebuilt. */
    1666          250 :     if (pubform->puballtables)
    1667           35 :         CacheInvalidateRelcacheAll();
    1668              : 
    1669          250 :     CatalogTupleDelete(rel, &tup->t_self);
    1670              : 
    1671          250 :     ReleaseSysCache(tup);
    1672              : 
    1673          250 :     table_close(rel, RowExclusiveLock);
    1674          250 : }
    1675              : 
    1676              : /*
    1677              :  * Remove schema from publication by mapping OID.
    1678              :  */
    1679              : void
    1680           96 : RemovePublicationSchemaById(Oid psoid)
    1681              : {
    1682              :     Relation    rel;
    1683              :     HeapTuple   tup;
    1684           96 :     List       *schemaRels = NIL;
    1685              :     Form_pg_publication_namespace pubsch;
    1686              : 
    1687           96 :     rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
    1688              : 
    1689           96 :     tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
    1690              : 
    1691           96 :     if (!HeapTupleIsValid(tup))
    1692            0 :         elog(ERROR, "cache lookup failed for publication schema %u", psoid);
    1693              : 
    1694           96 :     pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
    1695              : 
    1696              :     /*
    1697              :      * Invalidate relcache so that publication info is rebuilt. See
    1698              :      * RemovePublicationRelById for why we need to consider all the
    1699              :      * partitions.
    1700              :      */
    1701           96 :     schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
    1702              :                                                PUBLICATION_PART_ALL);
    1703           96 :     InvalidatePublicationRels(schemaRels);
    1704              : 
    1705           96 :     CatalogTupleDelete(rel, &tup->t_self);
    1706              : 
    1707           96 :     ReleaseSysCache(tup);
    1708              : 
    1709           96 :     table_close(rel, RowExclusiveLock);
    1710           96 : }
    1711              : 
    1712              : /*
    1713              :  * Open relations specified by a PublicationTable list.
    1714              :  * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
    1715              :  * add them to a publication.
    1716              :  */
    1717              : static List *
    1718          658 : OpenTableList(List *tables)
    1719              : {
    1720          658 :     List       *relids = NIL;
    1721          658 :     List       *rels = NIL;
    1722              :     ListCell   *lc;
    1723          658 :     List       *relids_with_rf = NIL;
    1724          658 :     List       *relids_with_collist = NIL;
    1725              : 
    1726              :     /*
    1727              :      * Open, share-lock, and check all the explicitly-specified relations
    1728              :      */
    1729         1350 :     foreach(lc, tables)
    1730              :     {
    1731          707 :         PublicationTable *t = lfirst_node(PublicationTable, lc);
    1732          707 :         bool        recurse = t->relation->inh;
    1733              :         Relation    rel;
    1734              :         Oid         myrelid;
    1735              :         PublicationRelInfo *pub_rel;
    1736              : 
    1737              :         /* Allow query cancel in case this takes a long time */
    1738          707 :         CHECK_FOR_INTERRUPTS();
    1739              : 
    1740          707 :         rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
    1741          704 :         myrelid = RelationGetRelid(rel);
    1742              : 
    1743              :         /*
    1744              :          * Filter out duplicates if user specifies "foo, foo".
    1745              :          *
    1746              :          * Note that this algorithm is known to not be very efficient (O(N^2))
    1747              :          * but given that it only works on list of tables given to us by user
    1748              :          * it's deemed acceptable.
    1749              :          */
    1750          704 :         if (list_member_oid(relids, myrelid))
    1751              :         {
    1752              :             /* Disallow duplicate tables if there are any with row filters. */
    1753           12 :             if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
    1754            6 :                 ereport(ERROR,
    1755              :                         (errcode(ERRCODE_DUPLICATE_OBJECT),
    1756              :                          errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
    1757              :                                 RelationGetRelationName(rel))));
    1758              : 
    1759              :             /* Disallow duplicate tables if there are any with column lists. */
    1760            6 :             if (t->columns || list_member_oid(relids_with_collist, myrelid))
    1761            6 :                 ereport(ERROR,
    1762              :                         (errcode(ERRCODE_DUPLICATE_OBJECT),
    1763              :                          errmsg("conflicting or redundant column lists for table \"%s\"",
    1764              :                                 RelationGetRelationName(rel))));
    1765              : 
    1766            0 :             table_close(rel, ShareUpdateExclusiveLock);
    1767            0 :             continue;
    1768              :         }
    1769              : 
    1770          692 :         pub_rel = palloc_object(PublicationRelInfo);
    1771          692 :         pub_rel->relation = rel;
    1772          692 :         pub_rel->whereClause = t->whereClause;
    1773          692 :         pub_rel->columns = t->columns;
    1774          692 :         rels = lappend(rels, pub_rel);
    1775          692 :         relids = lappend_oid(relids, myrelid);
    1776              : 
    1777          692 :         if (t->whereClause)
    1778          202 :             relids_with_rf = lappend_oid(relids_with_rf, myrelid);
    1779              : 
    1780          692 :         if (t->columns)
    1781          205 :             relids_with_collist = lappend_oid(relids_with_collist, myrelid);
    1782              : 
    1783              :         /*
    1784              :          * Add children of this rel, if requested, so that they too are added
    1785              :          * to the publication.  A partitioned table can't have any inheritance
    1786              :          * children other than its partitions, which need not be explicitly
    1787              :          * added to the publication.
    1788              :          */
    1789          692 :         if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
    1790              :         {
    1791              :             List       *children;
    1792              :             ListCell   *child;
    1793              : 
    1794          580 :             children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
    1795              :                                            NULL);
    1796              : 
    1797         1164 :             foreach(child, children)
    1798              :             {
    1799          584 :                 Oid         childrelid = lfirst_oid(child);
    1800              : 
    1801              :                 /* Allow query cancel in case this takes a long time */
    1802          584 :                 CHECK_FOR_INTERRUPTS();
    1803              : 
    1804              :                 /*
    1805              :                  * Skip duplicates if user specified both parent and child
    1806              :                  * tables.
    1807              :                  */
    1808          584 :                 if (list_member_oid(relids, childrelid))
    1809              :                 {
    1810              :                     /*
    1811              :                      * We don't allow to specify row filter for both parent
    1812              :                      * and child table at the same time as it is not very
    1813              :                      * clear which one should be given preference.
    1814              :                      */
    1815          580 :                     if (childrelid != myrelid &&
    1816            0 :                         (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
    1817            0 :                         ereport(ERROR,
    1818              :                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    1819              :                                  errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
    1820              :                                         RelationGetRelationName(rel))));
    1821              : 
    1822              :                     /*
    1823              :                      * We don't allow to specify column list for both parent
    1824              :                      * and child table at the same time as it is not very
    1825              :                      * clear which one should be given preference.
    1826              :                      */
    1827          580 :                     if (childrelid != myrelid &&
    1828            0 :                         (t->columns || list_member_oid(relids_with_collist, childrelid)))
    1829            0 :                         ereport(ERROR,
    1830              :                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    1831              :                                  errmsg("conflicting or redundant column lists for table \"%s\"",
    1832              :                                         RelationGetRelationName(rel))));
    1833              : 
    1834          580 :                     continue;
    1835              :                 }
    1836              : 
    1837              :                 /* find_all_inheritors already got lock */
    1838            4 :                 rel = table_open(childrelid, NoLock);
    1839            4 :                 pub_rel = palloc_object(PublicationRelInfo);
    1840            4 :                 pub_rel->relation = rel;
    1841              :                 /* child inherits WHERE clause from parent */
    1842            4 :                 pub_rel->whereClause = t->whereClause;
    1843              : 
    1844              :                 /* child inherits column list from parent */
    1845            4 :                 pub_rel->columns = t->columns;
    1846            4 :                 rels = lappend(rels, pub_rel);
    1847            4 :                 relids = lappend_oid(relids, childrelid);
    1848              : 
    1849            4 :                 if (t->whereClause)
    1850            1 :                     relids_with_rf = lappend_oid(relids_with_rf, childrelid);
    1851              : 
    1852            4 :                 if (t->columns)
    1853            0 :                     relids_with_collist = lappend_oid(relids_with_collist, childrelid);
    1854              :             }
    1855              :         }
    1856              :     }
    1857              : 
    1858          643 :     list_free(relids);
    1859          643 :     list_free(relids_with_rf);
    1860              : 
    1861          643 :     return rels;
    1862              : }
    1863              : 
    1864              : /*
    1865              :  * Close all relations in the list.
    1866              :  */
    1867              : static void
    1868          751 : CloseTableList(List *rels)
    1869              : {
    1870              :     ListCell   *lc;
    1871              : 
    1872         1528 :     foreach(lc, rels)
    1873              :     {
    1874              :         PublicationRelInfo *pub_rel;
    1875              : 
    1876          777 :         pub_rel = (PublicationRelInfo *) lfirst(lc);
    1877          777 :         table_close(pub_rel->relation, NoLock);
    1878              :     }
    1879              : 
    1880          751 :     list_free_deep(rels);
    1881          751 : }
    1882              : 
    1883              : /*
    1884              :  * Lock the schemas specified in the schema list in AccessShareLock mode in
    1885              :  * order to prevent concurrent schema deletion.
    1886              :  */
    1887              : static void
    1888          518 : LockSchemaList(List *schemalist)
    1889              : {
    1890              :     ListCell   *lc;
    1891              : 
    1892          680 :     foreach(lc, schemalist)
    1893              :     {
    1894          162 :         Oid         schemaid = lfirst_oid(lc);
    1895              : 
    1896              :         /* Allow query cancel in case this takes a long time */
    1897          162 :         CHECK_FOR_INTERRUPTS();
    1898          162 :         LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
    1899              : 
    1900              :         /*
    1901              :          * It is possible that by the time we acquire the lock on schema,
    1902              :          * concurrent DDL has removed it. We can test this by checking the
    1903              :          * existence of schema.
    1904              :          */
    1905          162 :         if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
    1906            0 :             ereport(ERROR,
    1907              :                     errcode(ERRCODE_UNDEFINED_SCHEMA),
    1908              :                     errmsg("schema with OID %u does not exist", schemaid));
    1909              :     }
    1910          518 : }
    1911              : 
    1912              : /*
    1913              :  * Add listed tables to the publication.
    1914              :  */
    1915              : static void
    1916          542 : PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
    1917              :                      AlterPublicationStmt *stmt)
    1918              : {
    1919              :     ListCell   *lc;
    1920              : 
    1921         1088 :     foreach(lc, rels)
    1922              :     {
    1923          580 :         PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
    1924          580 :         Relation    rel = pub_rel->relation;
    1925              :         ObjectAddress obj;
    1926              : 
    1927              :         /* Must be owner of the table or superuser. */
    1928          580 :         if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
    1929            3 :             aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
    1930            3 :                            RelationGetRelationName(rel));
    1931              : 
    1932          577 :         obj = publication_add_relation(pubid, pub_rel, if_not_exists);
    1933          546 :         if (stmt)
    1934              :         {
    1935          314 :             EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
    1936              :                                              (Node *) stmt);
    1937              : 
    1938          314 :             InvokeObjectPostCreateHook(PublicationRelRelationId,
    1939              :                                        obj.objectId, 0);
    1940              :         }
    1941              :     }
    1942          508 : }
    1943              : 
    1944              : /*
    1945              :  * Remove listed tables from the publication.
    1946              :  */
    1947              : static void
    1948          252 : PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
    1949              : {
    1950              :     ObjectAddress obj;
    1951              :     ListCell   *lc;
    1952              :     Oid         prid;
    1953              : 
    1954          483 :     foreach(lc, rels)
    1955              :     {
    1956          240 :         PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
    1957          240 :         Relation    rel = pubrel->relation;
    1958          240 :         Oid         relid = RelationGetRelid(rel);
    1959              : 
    1960          240 :         if (pubrel->columns)
    1961            0 :             ereport(ERROR,
    1962              :                     errcode(ERRCODE_SYNTAX_ERROR),
    1963              :                     errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
    1964              : 
    1965          240 :         prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
    1966              :                                ObjectIdGetDatum(relid),
    1967              :                                ObjectIdGetDatum(pubid));
    1968          240 :         if (!OidIsValid(prid))
    1969              :         {
    1970            6 :             if (missing_ok)
    1971            0 :                 continue;
    1972              : 
    1973            6 :             ereport(ERROR,
    1974              :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    1975              :                      errmsg("relation \"%s\" is not part of the publication",
    1976              :                             RelationGetRelationName(rel))));
    1977              :         }
    1978              : 
    1979          234 :         if (pubrel->whereClause)
    1980            3 :             ereport(ERROR,
    1981              :                     (errcode(ERRCODE_SYNTAX_ERROR),
    1982              :                      errmsg("cannot use a WHERE clause when removing a table from a publication")));
    1983              : 
    1984          231 :         ObjectAddressSet(obj, PublicationRelRelationId, prid);
    1985          231 :         performDeletion(&obj, DROP_CASCADE, 0);
    1986              :     }
    1987          243 : }
    1988              : 
    1989              : /*
    1990              :  * Add listed schemas to the publication.
    1991              :  */
    1992              : static void
    1993          294 : PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
    1994              :                       AlterPublicationStmt *stmt)
    1995              : {
    1996              :     ListCell   *lc;
    1997              : 
    1998          419 :     foreach(lc, schemas)
    1999              :     {
    2000          131 :         Oid         schemaid = lfirst_oid(lc);
    2001              :         ObjectAddress obj;
    2002              : 
    2003          131 :         obj = publication_add_schema(pubid, schemaid, if_not_exists);
    2004          125 :         if (stmt)
    2005              :         {
    2006           34 :             EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
    2007              :                                              (Node *) stmt);
    2008              : 
    2009           34 :             InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
    2010              :                                        obj.objectId, 0);
    2011              :         }
    2012              :     }
    2013          288 : }
    2014              : 
    2015              : /*
    2016              :  * Remove listed schemas from the publication.
    2017              :  */
    2018              : static void
    2019          221 : PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
    2020              : {
    2021              :     ObjectAddress obj;
    2022              :     ListCell   *lc;
    2023              :     Oid         psid;
    2024              : 
    2025          246 :     foreach(lc, schemas)
    2026              :     {
    2027           28 :         Oid         schemaid = lfirst_oid(lc);
    2028              : 
    2029           28 :         psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
    2030              :                                Anum_pg_publication_namespace_oid,
    2031              :                                ObjectIdGetDatum(schemaid),
    2032              :                                ObjectIdGetDatum(pubid));
    2033           28 :         if (!OidIsValid(psid))
    2034              :         {
    2035            3 :             if (missing_ok)
    2036            0 :                 continue;
    2037              : 
    2038            3 :             ereport(ERROR,
    2039              :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    2040              :                      errmsg("tables from schema \"%s\" are not part of the publication",
    2041              :                             get_namespace_name(schemaid))));
    2042              :         }
    2043              : 
    2044           25 :         ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
    2045           25 :         performDeletion(&obj, DROP_CASCADE, 0);
    2046              :     }
    2047          218 : }
    2048              : 
    2049              : /*
    2050              :  * Internal workhorse for changing a publication owner
    2051              :  */
    2052              : static void
    2053           18 : AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
    2054              : {
    2055              :     Form_pg_publication form;
    2056              : 
    2057           18 :     form = (Form_pg_publication) GETSTRUCT(tup);
    2058              : 
    2059           18 :     if (form->pubowner == newOwnerId)
    2060            6 :         return;
    2061              : 
    2062           12 :     if (!superuser())
    2063              :     {
    2064              :         AclResult   aclresult;
    2065              : 
    2066              :         /* Must be owner */
    2067            6 :         if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
    2068            0 :             aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
    2069            0 :                            NameStr(form->pubname));
    2070              : 
    2071              :         /* Must be able to become new owner */
    2072            6 :         check_can_set_role(GetUserId(), newOwnerId);
    2073              : 
    2074              :         /* New owner must have CREATE privilege on database */
    2075            6 :         aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
    2076            6 :         if (aclresult != ACLCHECK_OK)
    2077            0 :             aclcheck_error(aclresult, OBJECT_DATABASE,
    2078            0 :                            get_database_name(MyDatabaseId));
    2079              : 
    2080            6 :         if (!superuser_arg(newOwnerId))
    2081              :         {
    2082            6 :             if (form->puballtables || form->puballsequences ||
    2083            3 :                 is_schema_publication(form->oid))
    2084            3 :                 ereport(ERROR,
    2085              :                         errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    2086              :                         errmsg("permission denied to change owner of publication \"%s\"",
    2087              :                                NameStr(form->pubname)),
    2088              :                         errhint("The owner of a FOR ALL TABLES or ALL SEQUENCES or TABLES IN SCHEMA publication must be a superuser."));
    2089              :         }
    2090              :     }
    2091              : 
    2092            9 :     form->pubowner = newOwnerId;
    2093            9 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    2094              : 
    2095              :     /* Update owner dependency reference */
    2096            9 :     changeDependencyOnOwner(PublicationRelationId,
    2097              :                             form->oid,
    2098              :                             newOwnerId);
    2099              : 
    2100            9 :     InvokeObjectPostAlterHook(PublicationRelationId,
    2101              :                               form->oid, 0);
    2102              : }
    2103              : 
    2104              : /*
    2105              :  * Change publication owner -- by name
    2106              :  */
    2107              : ObjectAddress
    2108           18 : AlterPublicationOwner(const char *name, Oid newOwnerId)
    2109              : {
    2110              :     Oid         pubid;
    2111              :     HeapTuple   tup;
    2112              :     Relation    rel;
    2113              :     ObjectAddress address;
    2114              :     Form_pg_publication pubform;
    2115              : 
    2116           18 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    2117              : 
    2118           18 :     tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
    2119              : 
    2120           18 :     if (!HeapTupleIsValid(tup))
    2121            0 :         ereport(ERROR,
    2122              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2123              :                  errmsg("publication \"%s\" does not exist", name)));
    2124              : 
    2125           18 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    2126           18 :     pubid = pubform->oid;
    2127              : 
    2128           18 :     AlterPublicationOwner_internal(rel, tup, newOwnerId);
    2129              : 
    2130           15 :     ObjectAddressSet(address, PublicationRelationId, pubid);
    2131              : 
    2132           15 :     heap_freetuple(tup);
    2133              : 
    2134           15 :     table_close(rel, RowExclusiveLock);
    2135              : 
    2136           15 :     return address;
    2137              : }
    2138              : 
    2139              : /*
    2140              :  * Change publication owner -- by OID
    2141              :  */
    2142              : void
    2143            0 : AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId)
    2144              : {
    2145              :     HeapTuple   tup;
    2146              :     Relation    rel;
    2147              : 
    2148            0 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    2149              : 
    2150            0 :     tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    2151              : 
    2152            0 :     if (!HeapTupleIsValid(tup))
    2153            0 :         ereport(ERROR,
    2154              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2155              :                  errmsg("publication with OID %u does not exist", pubid)));
    2156              : 
    2157            0 :     AlterPublicationOwner_internal(rel, tup, newOwnerId);
    2158              : 
    2159            0 :     heap_freetuple(tup);
    2160              : 
    2161            0 :     table_close(rel, RowExclusiveLock);
    2162            0 : }
    2163              : 
    2164              : /*
    2165              :  * Extract the publish_generated_columns option value from a DefElem. "stored"
    2166              :  * and "none" values are accepted.
    2167              :  */
    2168              : static char
    2169           38 : defGetGeneratedColsOption(DefElem *def)
    2170              : {
    2171           38 :     char       *sval = "";
    2172              : 
    2173              :     /*
    2174              :      * A parameter value is required.
    2175              :      */
    2176           38 :     if (def->arg)
    2177              :     {
    2178           35 :         sval = defGetString(def);
    2179              : 
    2180           35 :         if (pg_strcasecmp(sval, "none") == 0)
    2181           11 :             return PUBLISH_GENCOLS_NONE;
    2182           24 :         if (pg_strcasecmp(sval, "stored") == 0)
    2183           21 :             return PUBLISH_GENCOLS_STORED;
    2184              :     }
    2185              : 
    2186            6 :     ereport(ERROR,
    2187              :             errcode(ERRCODE_SYNTAX_ERROR),
    2188              :             errmsg("invalid value for publication parameter \"%s\": \"%s\"", def->defname, sval),
    2189              :             errdetail("Valid values are \"%s\" and \"%s\".", "none", "stored"));
    2190              : 
    2191              :     return PUBLISH_GENCOLS_NONE;    /* keep compiler quiet */
    2192              : }
        

Generated by: LCOV version 2.0-1