LCOV - code coverage report
Current view: top level - src/backend/commands - publicationcmds.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 92.9 % 762 708
Test Date: 2026-03-22 08:15:57 Functions: 97.0 % 33 32
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * publicationcmds.c
       4              :  *      publication manipulation
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  * IDENTIFICATION
      10              :  *      src/backend/commands/publicationcmds.c
      11              :  *
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : 
      15              : #include "postgres.h"
      16              : 
      17              : #include "access/htup_details.h"
      18              : #include "access/table.h"
      19              : #include "access/xact.h"
      20              : #include "catalog/catalog.h"
      21              : #include "catalog/indexing.h"
      22              : #include "catalog/namespace.h"
      23              : #include "catalog/objectaccess.h"
      24              : #include "catalog/objectaddress.h"
      25              : #include "catalog/pg_database.h"
      26              : #include "catalog/pg_inherits.h"
      27              : #include "catalog/pg_namespace.h"
      28              : #include "catalog/pg_proc.h"
      29              : #include "catalog/pg_publication.h"
      30              : #include "catalog/pg_publication_namespace.h"
      31              : #include "catalog/pg_publication_rel.h"
      32              : #include "commands/defrem.h"
      33              : #include "commands/event_trigger.h"
      34              : #include "commands/publicationcmds.h"
      35              : #include "miscadmin.h"
      36              : #include "nodes/nodeFuncs.h"
      37              : #include "parser/parse_clause.h"
      38              : #include "parser/parse_collate.h"
      39              : #include "parser/parse_relation.h"
      40              : #include "rewrite/rewriteHandler.h"
      41              : #include "storage/lmgr.h"
      42              : #include "utils/acl.h"
      43              : #include "utils/builtins.h"
      44              : #include "utils/inval.h"
      45              : #include "utils/lsyscache.h"
      46              : #include "utils/rel.h"
      47              : #include "utils/syscache.h"
      48              : #include "utils/varlena.h"
      49              : 
      50              : 
      51              : /*
      52              :  * Information used to validate the columns in the row filter expression. See
      53              :  * contain_invalid_rfcolumn_walker for details.
      54              :  */
      55              : typedef struct rf_context
      56              : {
      57              :     Bitmapset  *bms_replident;  /* bitset of replica identity columns */
      58              :     bool        pubviaroot;     /* true if we are validating the parent
      59              :                                  * relation's row filter */
      60              :     Oid         relid;          /* relid of the relation */
      61              :     Oid         parentid;       /* relid of the parent relation */
      62              : } rf_context;
      63              : 
      64              : static List *OpenTableList(List *tables);
      65              : static void CloseTableList(List *rels);
      66              : static void LockSchemaList(List *schemalist);
      67              : static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
      68              :                                  AlterPublicationStmt *stmt);
      69              : static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
      70              : static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
      71              :                                   AlterPublicationStmt *stmt);
      72              : static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
      73              : static char defGetGeneratedColsOption(DefElem *def);
      74              : 
      75              : 
      76              : static void
      77          666 : parse_publication_options(ParseState *pstate,
      78              :                           List *options,
      79              :                           bool *publish_given,
      80              :                           PublicationActions *pubactions,
      81              :                           bool *publish_via_partition_root_given,
      82              :                           bool *publish_via_partition_root,
      83              :                           bool *publish_generated_columns_given,
      84              :                           char *publish_generated_columns)
      85              : {
      86              :     ListCell   *lc;
      87              : 
      88          666 :     *publish_given = false;
      89          666 :     *publish_via_partition_root_given = false;
      90          666 :     *publish_generated_columns_given = false;
      91              : 
      92              :     /* defaults */
      93          666 :     pubactions->pubinsert = true;
      94          666 :     pubactions->pubupdate = true;
      95          666 :     pubactions->pubdelete = true;
      96          666 :     pubactions->pubtruncate = true;
      97          666 :     *publish_via_partition_root = false;
      98          666 :     *publish_generated_columns = PUBLISH_GENCOLS_NONE;
      99              : 
     100              :     /* Parse options */
     101          898 :     foreach(lc, options)
     102              :     {
     103          256 :         DefElem    *defel = (DefElem *) lfirst(lc);
     104              : 
     105          256 :         if (strcmp(defel->defname, "publish") == 0)
     106              :         {
     107              :             char       *publish;
     108              :             List       *publish_list;
     109              :             ListCell   *lc2;
     110              : 
     111           89 :             if (*publish_given)
     112            0 :                 errorConflictingDefElem(defel, pstate);
     113              : 
     114              :             /*
     115              :              * If publish option was given only the explicitly listed actions
     116              :              * should be published.
     117              :              */
     118           89 :             pubactions->pubinsert = false;
     119           89 :             pubactions->pubupdate = false;
     120           89 :             pubactions->pubdelete = false;
     121           89 :             pubactions->pubtruncate = false;
     122              : 
     123           89 :             *publish_given = true;
     124              : 
     125              :             /*
     126              :              * SplitIdentifierString destructively modifies its input, so make
     127              :              * a copy so we don't modify the memory of the executing statement
     128              :              */
     129           89 :             publish = pstrdup(defGetString(defel));
     130              : 
     131           89 :             if (!SplitIdentifierString(publish, ',', &publish_list))
     132            0 :                 ereport(ERROR,
     133              :                         (errcode(ERRCODE_SYNTAX_ERROR),
     134              :                          errmsg("invalid list syntax in parameter \"%s\"",
     135              :                                 "publish")));
     136              : 
     137              :             /* Process the option list. */
     138          208 :             foreach(lc2, publish_list)
     139              :             {
     140          123 :                 char       *publish_opt = (char *) lfirst(lc2);
     141              : 
     142          123 :                 if (strcmp(publish_opt, "insert") == 0)
     143           79 :                     pubactions->pubinsert = true;
     144           44 :                 else if (strcmp(publish_opt, "update") == 0)
     145           18 :                     pubactions->pubupdate = true;
     146           26 :                 else if (strcmp(publish_opt, "delete") == 0)
     147           11 :                     pubactions->pubdelete = true;
     148           15 :                 else if (strcmp(publish_opt, "truncate") == 0)
     149           11 :                     pubactions->pubtruncate = true;
     150              :                 else
     151            4 :                     ereport(ERROR,
     152              :                             (errcode(ERRCODE_SYNTAX_ERROR),
     153              :                              errmsg("unrecognized value for publication option \"%s\": \"%s\"",
     154              :                                     "publish", publish_opt)));
     155              :             }
     156              :         }
     157          167 :         else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
     158              :         {
     159          110 :             if (*publish_via_partition_root_given)
     160            4 :                 errorConflictingDefElem(defel, pstate);
     161          106 :             *publish_via_partition_root_given = true;
     162          106 :             *publish_via_partition_root = defGetBoolean(defel);
     163              :         }
     164           57 :         else if (strcmp(defel->defname, "publish_generated_columns") == 0)
     165              :         {
     166           53 :             if (*publish_generated_columns_given)
     167            4 :                 errorConflictingDefElem(defel, pstate);
     168           49 :             *publish_generated_columns_given = true;
     169           49 :             *publish_generated_columns = defGetGeneratedColsOption(defel);
     170              :         }
     171              :         else
     172            4 :             ereport(ERROR,
     173              :                     (errcode(ERRCODE_SYNTAX_ERROR),
     174              :                      errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
     175              :     }
     176          642 : }
     177              : 
     178              : /*
     179              :  * Convert the PublicationObjSpecType list into schema oid list and
     180              :  * PublicationTable list.
     181              :  */
     182              : static void
     183         1251 : ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
     184              :                            List **rels, List **exceptrels, List **schemas)
     185              : {
     186              :     ListCell   *cell;
     187              :     PublicationObjSpec *pubobj;
     188              : 
     189         1251 :     if (!pubobjspec_list)
     190          183 :         return;
     191              : 
     192         2264 :     foreach(cell, pubobjspec_list)
     193              :     {
     194              :         Oid         schemaid;
     195              :         List       *search_path;
     196              : 
     197         1220 :         pubobj = (PublicationObjSpec *) lfirst(cell);
     198              : 
     199         1220 :         switch (pubobj->pubobjtype)
     200              :         {
     201           59 :             case PUBLICATIONOBJ_EXCEPT_TABLE:
     202           59 :                 pubobj->pubtable->except = true;
     203           59 :                 *exceptrels = lappend(*exceptrels, pubobj->pubtable);
     204           59 :                 break;
     205          901 :             case PUBLICATIONOBJ_TABLE:
     206          901 :                 pubobj->pubtable->except = false;
     207          901 :                 *rels = lappend(*rels, pubobj->pubtable);
     208          901 :                 break;
     209          244 :             case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
     210          244 :                 schemaid = get_namespace_oid(pubobj->name, false);
     211              : 
     212              :                 /* Filter out duplicates if user specifies "sch1, sch1" */
     213          224 :                 *schemas = list_append_unique_oid(*schemas, schemaid);
     214          224 :                 break;
     215           16 :             case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
     216           16 :                 search_path = fetch_search_path(false);
     217           16 :                 if (search_path == NIL) /* nothing valid in search_path? */
     218            4 :                     ereport(ERROR,
     219              :                             errcode(ERRCODE_UNDEFINED_SCHEMA),
     220              :                             errmsg("no schema has been selected for CURRENT_SCHEMA"));
     221              : 
     222           12 :                 schemaid = linitial_oid(search_path);
     223           12 :                 list_free(search_path);
     224              : 
     225              :                 /* Filter out duplicates if user specifies "sch1, sch1" */
     226           12 :                 *schemas = list_append_unique_oid(*schemas, schemaid);
     227           12 :                 break;
     228            0 :             default:
     229              :                 /* shouldn't happen */
     230            0 :                 elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
     231              :                 break;
     232              :         }
     233              :     }
     234              : }
     235              : 
     236              : /*
     237              :  * Returns true if any of the columns used in the row filter WHERE expression is
     238              :  * not part of REPLICA IDENTITY, false otherwise.
     239              :  */
     240              : static bool
     241          167 : contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
     242              : {
     243          167 :     if (node == NULL)
     244            0 :         return false;
     245              : 
     246          167 :     if (IsA(node, Var))
     247              :     {
     248           68 :         Var        *var = (Var *) node;
     249           68 :         AttrNumber  attnum = var->varattno;
     250              : 
     251              :         /*
     252              :          * If pubviaroot is true, we are validating the row filter of the
     253              :          * parent table, but the bitmap contains the replica identity
     254              :          * information of the child table. So, get the column number of the
     255              :          * child table as parent and child column order could be different.
     256              :          */
     257           68 :         if (context->pubviaroot)
     258              :         {
     259           10 :             char       *colname = get_attname(context->parentid, attnum, false);
     260              : 
     261           10 :             attnum = get_attnum(context->relid, colname);
     262              :         }
     263              : 
     264           68 :         if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
     265           68 :                            context->bms_replident))
     266           40 :             return true;
     267              :     }
     268              : 
     269          127 :     return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
     270              :                                   context);
     271              : }
     272              : 
     273              : /*
     274              :  * Check if all columns referenced in the filter expression are part of the
     275              :  * REPLICA IDENTITY index or not.
     276              :  *
     277              :  * Returns true if any invalid column is found.
     278              :  */
     279              : bool
     280          434 : pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
     281              :                                bool pubviaroot)
     282              : {
     283              :     HeapTuple   rftuple;
     284          434 :     Oid         relid = RelationGetRelid(relation);
     285          434 :     Oid         publish_as_relid = RelationGetRelid(relation);
     286          434 :     bool        result = false;
     287              :     Datum       rfdatum;
     288              :     bool        rfisnull;
     289              : 
     290              :     /*
     291              :      * FULL means all columns are in the REPLICA IDENTITY, so all columns are
     292              :      * allowed in the row filter and we can skip the validation.
     293              :      */
     294          434 :     if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     295          113 :         return false;
     296              : 
     297              :     /*
     298              :      * For a partition, if pubviaroot is true, find the topmost ancestor that
     299              :      * is published via this publication as we need to use its row filter
     300              :      * expression to filter the partition's changes.
     301              :      *
     302              :      * Note that even though the row filter used is for an ancestor, the
     303              :      * REPLICA IDENTITY used will be for the actual child table.
     304              :      */
     305          321 :     if (pubviaroot && relation->rd_rel->relispartition)
     306              :     {
     307              :         publish_as_relid
     308           68 :             = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
     309              : 
     310           68 :         if (!OidIsValid(publish_as_relid))
     311            3 :             publish_as_relid = relid;
     312              :     }
     313              : 
     314          321 :     rftuple = SearchSysCache2(PUBLICATIONRELMAP,
     315              :                               ObjectIdGetDatum(publish_as_relid),
     316              :                               ObjectIdGetDatum(pubid));
     317              : 
     318          321 :     if (!HeapTupleIsValid(rftuple))
     319           34 :         return false;
     320              : 
     321          287 :     rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
     322              :                               Anum_pg_publication_rel_prqual,
     323              :                               &rfisnull);
     324              : 
     325          287 :     if (!rfisnull)
     326              :     {
     327           67 :         rf_context  context = {0};
     328              :         Node       *rfnode;
     329           67 :         Bitmapset  *bms = NULL;
     330              : 
     331           67 :         context.pubviaroot = pubviaroot;
     332           67 :         context.parentid = publish_as_relid;
     333           67 :         context.relid = relid;
     334              : 
     335              :         /* Remember columns that are part of the REPLICA IDENTITY */
     336           67 :         bms = RelationGetIndexAttrBitmap(relation,
     337              :                                          INDEX_ATTR_BITMAP_IDENTITY_KEY);
     338              : 
     339           67 :         context.bms_replident = bms;
     340           67 :         rfnode = stringToNode(TextDatumGetCString(rfdatum));
     341           67 :         result = contain_invalid_rfcolumn_walker(rfnode, &context);
     342              :     }
     343              : 
     344          287 :     ReleaseSysCache(rftuple);
     345              : 
     346          287 :     return result;
     347              : }
     348              : 
     349              : /*
     350              :  * Check for invalid columns in the publication table definition.
     351              :  *
     352              :  * This function evaluates two conditions:
     353              :  *
     354              :  * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
     355              :  *    by the column list. If any column is missing, *invalid_column_list is set
     356              :  *    to true.
     357              :  * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY
     358              :  *    are published, either by being explicitly named in the column list or, if
     359              :  *    no column list is specified, by setting the option
     360              :  *    publish_generated_columns to stored. If any unpublished
     361              :  *    generated column is found, *invalid_gen_col is set to true.
     362              :  *
     363              :  * Returns true if any of the above conditions are not met.
     364              :  */
     365              : bool
     366          547 : pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
     367              :                             bool pubviaroot, char pubgencols_type,
     368              :                             bool *invalid_column_list,
     369              :                             bool *invalid_gen_col)
     370              : {
     371          547 :     Oid         relid = RelationGetRelid(relation);
     372          547 :     Oid         publish_as_relid = RelationGetRelid(relation);
     373              :     Bitmapset  *idattrs;
     374          547 :     Bitmapset  *columns = NULL;
     375          547 :     TupleDesc   desc = RelationGetDescr(relation);
     376              :     Publication *pub;
     377              :     int         x;
     378              : 
     379          547 :     *invalid_column_list = false;
     380          547 :     *invalid_gen_col = false;
     381              : 
     382              :     /*
     383              :      * For a partition, if pubviaroot is true, find the topmost ancestor that
     384              :      * is published via this publication as we need to use its column list for
     385              :      * the changes.
     386              :      *
     387              :      * Note that even though the column list used is for an ancestor, the
     388              :      * REPLICA IDENTITY used will be for the actual child table.
     389              :      */
     390          547 :     if (pubviaroot && relation->rd_rel->relispartition)
     391              :     {
     392           97 :         publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
     393              : 
     394           97 :         if (!OidIsValid(publish_as_relid))
     395           18 :             publish_as_relid = relid;
     396              :     }
     397              : 
     398              :     /* Fetch the column list */
     399          547 :     pub = GetPublication(pubid);
     400          547 :     check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns);
     401              : 
     402          547 :     if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     403              :     {
     404              :         /* With REPLICA IDENTITY FULL, no column list is allowed. */
     405          120 :         *invalid_column_list = (columns != NULL);
     406              : 
     407              :         /*
     408              :          * As we don't allow a column list with REPLICA IDENTITY FULL, the
     409              :          * publish_generated_columns option must be set to stored if the table
     410              :          * has any stored generated columns.
     411              :          */
     412          120 :         if (pubgencols_type != PUBLISH_GENCOLS_STORED &&
     413          112 :             relation->rd_att->constr &&
     414           52 :             relation->rd_att->constr->has_generated_stored)
     415            4 :             *invalid_gen_col = true;
     416              : 
     417              :         /*
     418              :          * Virtual generated columns are currently not supported for logical
     419              :          * replication at all.
     420              :          */
     421          120 :         if (relation->rd_att->constr &&
     422           60 :             relation->rd_att->constr->has_generated_virtual)
     423            8 :             *invalid_gen_col = true;
     424              : 
     425          120 :         if (*invalid_gen_col && *invalid_column_list)
     426            0 :             return true;
     427              :     }
     428              : 
     429              :     /* Remember columns that are part of the REPLICA IDENTITY */
     430          547 :     idattrs = RelationGetIndexAttrBitmap(relation,
     431              :                                          INDEX_ATTR_BITMAP_IDENTITY_KEY);
     432              : 
     433              :     /*
     434              :      * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
     435              :      * (to handle system columns the usual way), while column list does not
     436              :      * use offset, so we can't do bms_is_subset(). Instead, we have to loop
     437              :      * over the idattrs and check all of them are in the list.
     438              :      */
     439          547 :     x = -1;
     440          925 :     while ((x = bms_next_member(idattrs, x)) >= 0)
     441              :     {
     442          382 :         AttrNumber  attnum = (x + FirstLowInvalidHeapAttributeNumber);
     443          382 :         Form_pg_attribute att = TupleDescAttr(desc, attnum - 1);
     444              : 
     445          382 :         if (columns == NULL)
     446              :         {
     447              :             /*
     448              :              * The publish_generated_columns option must be set to stored if
     449              :              * the REPLICA IDENTITY contains any stored generated column.
     450              :              */
     451          247 :             if (att->attgenerated == ATTRIBUTE_GENERATED_STORED && pubgencols_type != PUBLISH_GENCOLS_STORED)
     452              :             {
     453            4 :                 *invalid_gen_col = true;
     454            4 :                 break;
     455              :             }
     456              : 
     457              :             /*
     458              :              * The equivalent setting for virtual generated columns does not
     459              :              * exist yet.
     460              :              */
     461          243 :             if (att->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
     462              :             {
     463            0 :                 *invalid_gen_col = true;
     464            0 :                 break;
     465              :             }
     466              : 
     467              :             /* Skip validating the column list since it is not defined */
     468          243 :             continue;
     469              :         }
     470              : 
     471              :         /*
     472              :          * If pubviaroot is true, we are validating the column list of the
     473              :          * parent table, but the bitmap contains the replica identity
     474              :          * information of the child table. The parent/child attnums may not
     475              :          * match, so translate them to the parent - get the attname from the
     476              :          * child, and look it up in the parent.
     477              :          */
     478          135 :         if (pubviaroot)
     479              :         {
     480              :             /* attribute name in the child table */
     481           51 :             char       *colname = get_attname(relid, attnum, false);
     482              : 
     483              :             /*
     484              :              * Determine the attnum for the attribute name in parent (we are
     485              :              * using the column list defined on the parent).
     486              :              */
     487           51 :             attnum = get_attnum(publish_as_relid, colname);
     488              :         }
     489              : 
     490              :         /* replica identity column, not covered by the column list */
     491          135 :         *invalid_column_list |= !bms_is_member(attnum, columns);
     492              : 
     493          135 :         if (*invalid_column_list && *invalid_gen_col)
     494            0 :             break;
     495              :     }
     496              : 
     497          547 :     bms_free(columns);
     498          547 :     bms_free(idattrs);
     499              : 
     500          547 :     return *invalid_column_list || *invalid_gen_col;
     501              : }
     502              : 
     503              : /*
     504              :  * Invalidate entries in the RelationSyncCache for relations included in the
     505              :  * specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA.
     506              :  *
     507              :  * If 'puballtables' is true, invalidate all cache entries.
     508              :  */
     509              : void
     510           20 : InvalidatePubRelSyncCache(Oid pubid, bool puballtables)
     511              : {
     512           20 :     if (puballtables)
     513              :     {
     514            3 :         CacheInvalidateRelSyncAll();
     515              :     }
     516              :     else
     517              :     {
     518           17 :         List       *relids = NIL;
     519           17 :         List       *schemarelids = NIL;
     520              : 
     521              :         /*
     522              :          * For partitioned tables, we must invalidate all partitions and
     523              :          * itself. WAL records for INSERT/UPDATE/DELETE specify leaf tables as
     524              :          * a target. However, WAL records for TRUNCATE specify both a root and
     525              :          * its leaves.
     526              :          */
     527           17 :         relids = GetIncludedPublicationRelations(pubid,
     528              :                                                  PUBLICATION_PART_ALL);
     529           17 :         schemarelids = GetAllSchemaPublicationRelations(pubid,
     530              :                                                         PUBLICATION_PART_ALL);
     531              : 
     532           17 :         relids = list_concat_unique_oid(relids, schemarelids);
     533              : 
     534              :         /* Invalidate the relsyncache */
     535           37 :         foreach_oid(relid, relids)
     536            3 :             CacheInvalidateRelSync(relid);
     537              :     }
     538              : 
     539           20 :     return;
     540              : }
     541              : 
     542              : /* check_functions_in_node callback */
     543              : static bool
     544          278 : contain_mutable_or_user_functions_checker(Oid func_id, void *context)
     545              : {
     546          278 :     return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
     547              :             func_id >= FirstNormalObjectId);
     548              : }
     549              : 
     550              : /*
     551              :  * The row filter walker checks if the row filter expression is a "simple
     552              :  * expression".
     553              :  *
     554              :  * It allows only simple or compound expressions such as:
     555              :  * - (Var Op Const)
     556              :  * - (Var Op Var)
     557              :  * - (Var Op Const) AND/OR (Var Op Const)
     558              :  * - etc
     559              :  * (where Var is a column of the table this filter belongs to)
     560              :  *
     561              :  * The simple expression has the following restrictions:
     562              :  * - User-defined operators are not allowed;
     563              :  * - User-defined functions are not allowed;
     564              :  * - User-defined types are not allowed;
     565              :  * - User-defined collations are not allowed;
     566              :  * - Non-immutable built-in functions are not allowed;
     567              :  * - System columns are not allowed.
     568              :  *
     569              :  * NOTES
     570              :  *
     571              :  * We don't allow user-defined functions/operators/types/collations because
     572              :  * (a) if a user drops a user-defined object used in a row filter expression or
     573              :  * if there is any other error while using it, the logical decoding
     574              :  * infrastructure won't be able to recover from such an error even if the
     575              :  * object is recreated again because a historic snapshot is used to evaluate
     576              :  * the row filter;
     577              :  * (b) a user-defined function can be used to access tables that could have
     578              :  * unpleasant results because a historic snapshot is used. That's why only
     579              :  * immutable built-in functions are allowed in row filter expressions.
     580              :  *
     581              :  * We don't allow system columns because currently, we don't have that
     582              :  * information in the tuple passed to downstream. Also, as we don't replicate
     583              :  * those to subscribers, there doesn't seem to be a need for a filter on those
     584              :  * columns.
     585              :  *
     586              :  * We can allow other node types after more analysis and testing.
     587              :  */
     588              : static bool
     589          923 : check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
     590              : {
     591          923 :     char       *errdetail_msg = NULL;
     592              : 
     593          923 :     if (node == NULL)
     594            4 :         return false;
     595              : 
     596          919 :     switch (nodeTag(node))
     597              :     {
     598          248 :         case T_Var:
     599              :             /* System columns are not allowed. */
     600          248 :             if (((Var *) node)->varattno < InvalidAttrNumber)
     601            4 :                 errdetail_msg = _("System columns are not allowed.");
     602          248 :             break;
     603          249 :         case T_OpExpr:
     604              :         case T_DistinctExpr:
     605              :         case T_NullIfExpr:
     606              :             /* OK, except user-defined operators are not allowed. */
     607          249 :             if (((OpExpr *) node)->opno >= FirstNormalObjectId)
     608            4 :                 errdetail_msg = _("User-defined operators are not allowed.");
     609          249 :             break;
     610            4 :         case T_ScalarArrayOpExpr:
     611              :             /* OK, except user-defined operators are not allowed. */
     612            4 :             if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
     613            0 :                 errdetail_msg = _("User-defined operators are not allowed.");
     614              : 
     615              :             /*
     616              :              * We don't need to check the hashfuncid and negfuncid of
     617              :              * ScalarArrayOpExpr as those functions are only built for a
     618              :              * subquery.
     619              :              */
     620            4 :             break;
     621            4 :         case T_RowCompareExpr:
     622              :             {
     623              :                 ListCell   *opid;
     624              : 
     625              :                 /* OK, except user-defined operators are not allowed. */
     626           12 :                 foreach(opid, ((RowCompareExpr *) node)->opnos)
     627              :                 {
     628            8 :                     if (lfirst_oid(opid) >= FirstNormalObjectId)
     629              :                     {
     630            0 :                         errdetail_msg = _("User-defined operators are not allowed.");
     631            0 :                         break;
     632              :                     }
     633              :                 }
     634              :             }
     635            4 :             break;
     636          410 :         case T_Const:
     637              :         case T_FuncExpr:
     638              :         case T_BoolExpr:
     639              :         case T_RelabelType:
     640              :         case T_CollateExpr:
     641              :         case T_CaseExpr:
     642              :         case T_CaseTestExpr:
     643              :         case T_ArrayExpr:
     644              :         case T_RowExpr:
     645              :         case T_CoalesceExpr:
     646              :         case T_MinMaxExpr:
     647              :         case T_XmlExpr:
     648              :         case T_NullTest:
     649              :         case T_BooleanTest:
     650              :         case T_List:
     651              :             /* OK, supported */
     652          410 :             break;
     653            4 :         default:
     654            4 :             errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
     655            4 :             break;
     656              :     }
     657              : 
     658              :     /*
     659              :      * For all the supported nodes, if we haven't already found a problem,
     660              :      * check the types, functions, and collations used in it.  We check List
     661              :      * by walking through each element.
     662              :      */
     663          919 :     if (!errdetail_msg && !IsA(node, List))
     664              :     {
     665          871 :         if (exprType(node) >= FirstNormalObjectId)
     666            4 :             errdetail_msg = _("User-defined types are not allowed.");
     667          867 :         else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
     668              :                                          pstate))
     669            8 :             errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
     670         1718 :         else if (exprCollation(node) >= FirstNormalObjectId ||
     671          859 :                  exprInputCollation(node) >= FirstNormalObjectId)
     672            4 :             errdetail_msg = _("User-defined collations are not allowed.");
     673              :     }
     674              : 
     675              :     /*
     676              :      * If we found a problem in this node, throw error now. Otherwise keep
     677              :      * going.
     678              :      */
     679          919 :     if (errdetail_msg)
     680           28 :         ereport(ERROR,
     681              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     682              :                  errmsg("invalid publication WHERE expression"),
     683              :                  errdetail_internal("%s", errdetail_msg),
     684              :                  parser_errposition(pstate, exprLocation(node))));
     685              : 
     686          891 :     return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
     687              :                                   pstate);
     688              : }
     689              : 
     690              : /*
     691              :  * Check if the row filter expression is a "simple expression".
     692              :  *
     693              :  * See check_simple_rowfilter_expr_walker for details.
     694              :  */
     695              : static bool
     696          241 : check_simple_rowfilter_expr(Node *node, ParseState *pstate)
     697              : {
     698          241 :     return check_simple_rowfilter_expr_walker(node, pstate);
     699              : }
     700              : 
     701              : /*
     702              :  * Transform the publication WHERE expression for all the relations in the list,
     703              :  * ensuring it is coerced to boolean and necessary collation information is
     704              :  * added if required, and add a new nsitem/RTE for the associated relation to
     705              :  * the ParseState's namespace list.
     706              :  *
     707              :  * Also check the publication row filter expression and throw an error if
     708              :  * anything not permitted or unexpected is encountered.
     709              :  */
     710              : static void
     711          747 : TransformPubWhereClauses(List *tables, const char *queryString,
     712              :                          bool pubviaroot)
     713              : {
     714              :     ListCell   *lc;
     715              : 
     716         1495 :     foreach(lc, tables)
     717              :     {
     718              :         ParseNamespaceItem *nsitem;
     719          788 :         Node       *whereclause = NULL;
     720              :         ParseState *pstate;
     721          788 :         PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
     722              : 
     723          788 :         if (pri->whereClause == NULL)
     724          535 :             continue;
     725              : 
     726              :         /*
     727              :          * If the publication doesn't publish changes via the root partitioned
     728              :          * table, the partition's row filter will be used. So disallow using
     729              :          * WHERE clause on partitioned table in this case.
     730              :          */
     731          253 :         if (!pubviaroot &&
     732          235 :             pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     733            4 :             ereport(ERROR,
     734              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     735              :                      errmsg("cannot use publication WHERE clause for relation \"%s\"",
     736              :                             RelationGetRelationName(pri->relation)),
     737              :                      errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
     738              :                                "publish_via_partition_root")));
     739              : 
     740              :         /*
     741              :          * A fresh pstate is required so that we only have "this" table in its
     742              :          * rangetable
     743              :          */
     744          249 :         pstate = make_parsestate(NULL);
     745          249 :         pstate->p_sourcetext = queryString;
     746          249 :         nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
     747              :                                                AccessShareLock, NULL,
     748              :                                                false, false);
     749          249 :         addNSItemToQuery(pstate, nsitem, false, true, true);
     750              : 
     751          249 :         whereclause = transformWhereClause(pstate,
     752          249 :                                            copyObject(pri->whereClause),
     753              :                                            EXPR_KIND_WHERE,
     754              :                                            "PUBLICATION WHERE");
     755              : 
     756              :         /* Fix up collation information */
     757          241 :         assign_expr_collations(pstate, whereclause);
     758              : 
     759          241 :         whereclause = expand_generated_columns_in_expr(whereclause, pri->relation, 1);
     760              : 
     761              :         /*
     762              :          * We allow only simple expressions in row filters. See
     763              :          * check_simple_rowfilter_expr_walker.
     764              :          */
     765          241 :         check_simple_rowfilter_expr(whereclause, pstate);
     766              : 
     767          213 :         free_parsestate(pstate);
     768              : 
     769          213 :         pri->whereClause = whereclause;
     770              :     }
     771          707 : }
     772              : 
     773              : 
     774              : /*
     775              :  * Given a list of tables that are going to be added to a publication,
     776              :  * verify that they fulfill the necessary preconditions, namely: no tables
     777              :  * have a column list if any schema is published; and partitioned tables do
     778              :  * not have column lists if publish_via_partition_root is not set.
     779              :  *
     780              :  * 'publish_schema' indicates that the publication contains any TABLES IN
     781              :  * SCHEMA elements (newly added in this command, or preexisting).
     782              :  * 'pubviaroot' is the value of publish_via_partition_root.
     783              :  */
     784              : static void
     785          707 : CheckPubRelationColumnList(char *pubname, List *tables,
     786              :                            bool publish_schema, bool pubviaroot)
     787              : {
     788              :     ListCell   *lc;
     789              : 
     790         1435 :     foreach(lc, tables)
     791              :     {
     792          748 :         PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
     793              : 
     794          748 :         if (pri->columns == NIL)
     795          492 :             continue;
     796              : 
     797              :         /*
     798              :          * Disallow specifying column list if any schema is in the
     799              :          * publication.
     800              :          *
     801              :          * XXX We could instead just forbid the case when the publication
     802              :          * tries to publish the table with a column list and a schema for that
     803              :          * table. However, if we do that then we need a restriction during
     804              :          * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
     805              :          * seem to be a good idea.
     806              :          */
     807          256 :         if (publish_schema)
     808           16 :             ereport(ERROR,
     809              :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     810              :                     errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
     811              :                            get_namespace_name(RelationGetNamespace(pri->relation)),
     812              :                            RelationGetRelationName(pri->relation), pubname),
     813              :                     errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
     814              : 
     815              :         /*
     816              :          * If the publication doesn't publish changes via the root partitioned
     817              :          * table, the partition's column list will be used. So disallow using
     818              :          * a column list on the partitioned table in this case.
     819              :          */
     820          240 :         if (!pubviaroot &&
     821          194 :             pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     822            4 :             ereport(ERROR,
     823              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     824              :                      errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
     825              :                             get_namespace_name(RelationGetNamespace(pri->relation)),
     826              :                             RelationGetRelationName(pri->relation), pubname),
     827              :                      errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
     828              :                                "publish_via_partition_root")));
     829              :     }
     830          687 : }
     831              : 
     832              : /*
     833              :  * Create new publication.
     834              :  */
     835              : ObjectAddress
     836          590 : CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
     837              : {
     838              :     Relation    rel;
     839              :     ObjectAddress myself;
     840              :     Oid         puboid;
     841              :     bool        nulls[Natts_pg_publication];
     842              :     Datum       values[Natts_pg_publication];
     843              :     HeapTuple   tup;
     844              :     bool        publish_given;
     845              :     PublicationActions pubactions;
     846              :     bool        publish_via_partition_root_given;
     847              :     bool        publish_via_partition_root;
     848              :     bool        publish_generated_columns_given;
     849              :     char        publish_generated_columns;
     850              :     AclResult   aclresult;
     851          590 :     List       *relations = NIL;
     852          590 :     List       *exceptrelations = NIL;
     853          590 :     List       *schemaidlist = NIL;
     854              : 
     855              :     /* must have CREATE privilege on database */
     856          590 :     aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
     857          590 :     if (aclresult != ACLCHECK_OK)
     858            4 :         aclcheck_error(aclresult, OBJECT_DATABASE,
     859            4 :                        get_database_name(MyDatabaseId));
     860              : 
     861              :     /* FOR ALL TABLES and FOR ALL SEQUENCES requires superuser */
     862          586 :     if (!superuser())
     863              :     {
     864           15 :         if (stmt->for_all_tables || stmt->for_all_sequences)
     865            0 :             ereport(ERROR,
     866              :                     errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     867              :                     errmsg("must be superuser to create a FOR ALL TABLES or ALL SEQUENCES publication"));
     868              :     }
     869              : 
     870          586 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
     871              : 
     872              :     /* Check if name is used */
     873          586 :     puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
     874              :                              CStringGetDatum(stmt->pubname));
     875          586 :     if (OidIsValid(puboid))
     876            4 :         ereport(ERROR,
     877              :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
     878              :                  errmsg("publication \"%s\" already exists",
     879              :                         stmt->pubname)));
     880              : 
     881              :     /* Form a tuple. */
     882          582 :     memset(values, 0, sizeof(values));
     883          582 :     memset(nulls, false, sizeof(nulls));
     884              : 
     885          582 :     values[Anum_pg_publication_pubname - 1] =
     886          582 :         DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
     887          582 :     values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
     888              : 
     889          582 :     parse_publication_options(pstate,
     890              :                               stmt->options,
     891              :                               &publish_given, &pubactions,
     892              :                               &publish_via_partition_root_given,
     893              :                               &publish_via_partition_root,
     894              :                               &publish_generated_columns_given,
     895              :                               &publish_generated_columns);
     896              : 
     897          558 :     if (stmt->for_all_sequences &&
     898           17 :         (publish_given || publish_via_partition_root_given ||
     899              :          publish_generated_columns_given))
     900            1 :         ereport(NOTICE,
     901              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     902              :                 errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
     903              : 
     904          558 :     puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
     905              :                                 Anum_pg_publication_oid);
     906          558 :     values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
     907          558 :     values[Anum_pg_publication_puballtables - 1] =
     908          558 :         BoolGetDatum(stmt->for_all_tables);
     909          558 :     values[Anum_pg_publication_puballsequences - 1] =
     910          558 :         BoolGetDatum(stmt->for_all_sequences);
     911          558 :     values[Anum_pg_publication_pubinsert - 1] =
     912          558 :         BoolGetDatum(pubactions.pubinsert);
     913          558 :     values[Anum_pg_publication_pubupdate - 1] =
     914          558 :         BoolGetDatum(pubactions.pubupdate);
     915          558 :     values[Anum_pg_publication_pubdelete - 1] =
     916          558 :         BoolGetDatum(pubactions.pubdelete);
     917          558 :     values[Anum_pg_publication_pubtruncate - 1] =
     918          558 :         BoolGetDatum(pubactions.pubtruncate);
     919          558 :     values[Anum_pg_publication_pubviaroot - 1] =
     920          558 :         BoolGetDatum(publish_via_partition_root);
     921          558 :     values[Anum_pg_publication_pubgencols - 1] =
     922          558 :         CharGetDatum(publish_generated_columns);
     923              : 
     924          558 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     925              : 
     926              :     /* Insert tuple into catalog. */
     927          558 :     CatalogTupleInsert(rel, tup);
     928          558 :     heap_freetuple(tup);
     929              : 
     930          558 :     recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
     931              : 
     932          558 :     ObjectAddressSet(myself, PublicationRelationId, puboid);
     933              : 
     934              :     /* Make the changes visible. */
     935          558 :     CommandCounterIncrement();
     936              : 
     937              :     /* Associate objects with the publication. */
     938          558 :     ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
     939              :                                &exceptrelations, &schemaidlist);
     940              : 
     941          546 :     if (stmt->for_all_tables)
     942              :     {
     943              :         /* Process EXCEPT table list */
     944          100 :         if (exceptrelations != NIL)
     945              :         {
     946              :             List       *rels;
     947              : 
     948           35 :             rels = OpenTableList(exceptrelations);
     949           35 :             PublicationAddTables(puboid, rels, true, NULL);
     950           31 :             CloseTableList(rels);
     951              :         }
     952              : 
     953              :         /*
     954              :          * Invalidate relcache so that publication info is rebuilt. Sequences
     955              :          * publication doesn't require invalidation, as replica identity
     956              :          * checks don't apply to them.
     957              :          */
     958           96 :         CacheInvalidateRelcacheAll();
     959              :     }
     960          446 :     else if (!stmt->for_all_sequences)
     961              :     {
     962              :         /* FOR TABLES IN SCHEMA requires superuser */
     963          434 :         if (schemaidlist != NIL && !superuser())
     964            4 :             ereport(ERROR,
     965              :                     errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     966              :                     errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
     967              : 
     968          430 :         if (relations != NIL)
     969              :         {
     970              :             List       *rels;
     971              : 
     972          285 :             rels = OpenTableList(relations);
     973          265 :             TransformPubWhereClauses(rels, pstate->p_sourcetext,
     974              :                                      publish_via_partition_root);
     975              : 
     976          249 :             CheckPubRelationColumnList(stmt->pubname, rels,
     977              :                                        schemaidlist != NIL,
     978              :                                        publish_via_partition_root);
     979              : 
     980          245 :             PublicationAddTables(puboid, rels, true, NULL);
     981          228 :             CloseTableList(rels);
     982              :         }
     983              : 
     984          373 :         if (schemaidlist != NIL)
     985              :         {
     986              :             /*
     987              :              * Schema lock is held until the publication is created to prevent
     988              :              * concurrent schema deletion.
     989              :              */
     990          100 :             LockSchemaList(schemaidlist);
     991          100 :             PublicationAddSchemas(puboid, schemaidlist, true, NULL);
     992              :         }
     993              :     }
     994              : 
     995          477 :     table_close(rel, RowExclusiveLock);
     996              : 
     997          477 :     InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
     998              : 
     999              :     /*
    1000              :      * We don't need this warning message when wal_level >= 'replica' since
    1001              :      * logical decoding is automatically enabled up on a logical slot
    1002              :      * creation.
    1003              :      */
    1004          477 :     if (wal_level < WAL_LEVEL_REPLICA)
    1005           93 :         ereport(WARNING,
    1006              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1007              :                  errmsg("logical decoding must be enabled to publish logical changes"),
    1008              :                  errhint("Before creating subscriptions, ensure that \"wal_level\" is set to \"replica\" or higher.")));
    1009              : 
    1010          477 :     return myself;
    1011              : }
    1012              : 
    1013              : /*
    1014              :  * Change options of a publication.
    1015              :  */
    1016              : static void
    1017           84 : AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
    1018              :                         Relation rel, HeapTuple tup)
    1019              : {
    1020              :     bool        nulls[Natts_pg_publication];
    1021              :     bool        replaces[Natts_pg_publication];
    1022              :     Datum       values[Natts_pg_publication];
    1023              :     bool        publish_given;
    1024              :     PublicationActions pubactions;
    1025              :     bool        publish_via_partition_root_given;
    1026              :     bool        publish_via_partition_root;
    1027              :     bool        publish_generated_columns_given;
    1028              :     char        publish_generated_columns;
    1029              :     ObjectAddress obj;
    1030              :     Form_pg_publication pubform;
    1031           84 :     List       *root_relids = NIL;
    1032              :     ListCell   *lc;
    1033              : 
    1034           84 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1035              : 
    1036           84 :     parse_publication_options(pstate,
    1037              :                               stmt->options,
    1038              :                               &publish_given, &pubactions,
    1039              :                               &publish_via_partition_root_given,
    1040              :                               &publish_via_partition_root,
    1041              :                               &publish_generated_columns_given,
    1042              :                               &publish_generated_columns);
    1043              : 
    1044           84 :     if (pubform->puballsequences &&
    1045            8 :         (publish_given || publish_via_partition_root_given ||
    1046              :          publish_generated_columns_given))
    1047            8 :         ereport(NOTICE,
    1048              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1049              :                 errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
    1050              : 
    1051              :     /*
    1052              :      * If the publication doesn't publish changes via the root partitioned
    1053              :      * table, the partition's row filter and column list will be used. So
    1054              :      * disallow using WHERE clause and column lists on partitioned table in
    1055              :      * this case.
    1056              :      */
    1057           84 :     if (!pubform->puballtables && publish_via_partition_root_given &&
    1058           53 :         !publish_via_partition_root)
    1059              :     {
    1060              :         /*
    1061              :          * Lock the publication so nobody else can do anything with it. This
    1062              :          * prevents concurrent alter to add partitioned table(s) with WHERE
    1063              :          * clause(s) and/or column lists which we don't allow when not
    1064              :          * publishing via root.
    1065              :          */
    1066           32 :         LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
    1067              :                            AccessShareLock);
    1068              : 
    1069           32 :         root_relids = GetIncludedPublicationRelations(pubform->oid,
    1070              :                                                       PUBLICATION_PART_ROOT);
    1071              : 
    1072           56 :         foreach(lc, root_relids)
    1073              :         {
    1074           32 :             Oid         relid = lfirst_oid(lc);
    1075              :             HeapTuple   rftuple;
    1076              :             char        relkind;
    1077              :             char       *relname;
    1078              :             bool        has_rowfilter;
    1079              :             bool        has_collist;
    1080              : 
    1081              :             /*
    1082              :              * Beware: we don't have lock on the relations, so cope silently
    1083              :              * with the cache lookups returning NULL.
    1084              :              */
    1085              : 
    1086           32 :             rftuple = SearchSysCache2(PUBLICATIONRELMAP,
    1087              :                                       ObjectIdGetDatum(relid),
    1088              :                                       ObjectIdGetDatum(pubform->oid));
    1089           32 :             if (!HeapTupleIsValid(rftuple))
    1090            0 :                 continue;
    1091           32 :             has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
    1092           32 :             has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
    1093           32 :             if (!has_rowfilter && !has_collist)
    1094              :             {
    1095            8 :                 ReleaseSysCache(rftuple);
    1096            8 :                 continue;
    1097              :             }
    1098              : 
    1099           24 :             relkind = get_rel_relkind(relid);
    1100           24 :             if (relkind != RELKIND_PARTITIONED_TABLE)
    1101              :             {
    1102           16 :                 ReleaseSysCache(rftuple);
    1103           16 :                 continue;
    1104              :             }
    1105            8 :             relname = get_rel_name(relid);
    1106            8 :             if (relname == NULL)    /* table concurrently dropped */
    1107              :             {
    1108            0 :                 ReleaseSysCache(rftuple);
    1109            0 :                 continue;
    1110              :             }
    1111              : 
    1112            8 :             if (has_rowfilter)
    1113            4 :                 ereport(ERROR,
    1114              :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1115              :                          errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
    1116              :                                 "publish_via_partition_root",
    1117              :                                 stmt->pubname),
    1118              :                          errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
    1119              :                                    relname, "publish_via_partition_root")));
    1120              :             Assert(has_collist);
    1121            4 :             ereport(ERROR,
    1122              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1123              :                      errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
    1124              :                             "publish_via_partition_root",
    1125              :                             stmt->pubname),
    1126              :                      errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
    1127              :                                relname, "publish_via_partition_root")));
    1128              :         }
    1129              :     }
    1130              : 
    1131              :     /* Everything ok, form a new tuple. */
    1132           76 :     memset(values, 0, sizeof(values));
    1133           76 :     memset(nulls, false, sizeof(nulls));
    1134           76 :     memset(replaces, false, sizeof(replaces));
    1135              : 
    1136           76 :     if (publish_given)
    1137              :     {
    1138           22 :         values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
    1139           22 :         replaces[Anum_pg_publication_pubinsert - 1] = true;
    1140              : 
    1141           22 :         values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
    1142           22 :         replaces[Anum_pg_publication_pubupdate - 1] = true;
    1143              : 
    1144           22 :         values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
    1145           22 :         replaces[Anum_pg_publication_pubdelete - 1] = true;
    1146              : 
    1147           22 :         values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
    1148           22 :         replaces[Anum_pg_publication_pubtruncate - 1] = true;
    1149              :     }
    1150              : 
    1151           76 :     if (publish_via_partition_root_given)
    1152              :     {
    1153           46 :         values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
    1154           46 :         replaces[Anum_pg_publication_pubviaroot - 1] = true;
    1155              :     }
    1156              : 
    1157           76 :     if (publish_generated_columns_given)
    1158              :     {
    1159            8 :         values[Anum_pg_publication_pubgencols - 1] = CharGetDatum(publish_generated_columns);
    1160            8 :         replaces[Anum_pg_publication_pubgencols - 1] = true;
    1161              :     }
    1162              : 
    1163           76 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
    1164              :                             replaces);
    1165              : 
    1166              :     /* Update the catalog. */
    1167           76 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    1168              : 
    1169           76 :     CommandCounterIncrement();
    1170              : 
    1171           76 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1172              : 
    1173              :     /* Invalidate the relcache. */
    1174           76 :     if (pubform->puballtables)
    1175              :     {
    1176           13 :         CacheInvalidateRelcacheAll();
    1177              :     }
    1178              :     else
    1179              :     {
    1180           63 :         List       *relids = NIL;
    1181           63 :         List       *schemarelids = NIL;
    1182              : 
    1183              :         /*
    1184              :          * For any partitioned tables contained in the publication, we must
    1185              :          * invalidate all partitions contained in the respective partition
    1186              :          * trees, not just those explicitly mentioned in the publication.
    1187              :          */
    1188           63 :         if (root_relids == NIL)
    1189           39 :             relids = GetIncludedPublicationRelations(pubform->oid,
    1190              :                                                      PUBLICATION_PART_ALL);
    1191              :         else
    1192              :         {
    1193              :             /*
    1194              :              * We already got tables explicitly mentioned in the publication.
    1195              :              * Now get all partitions for the partitioned table in the list.
    1196              :              */
    1197           48 :             foreach(lc, root_relids)
    1198           24 :                 relids = GetPubPartitionOptionRelations(relids,
    1199              :                                                         PUBLICATION_PART_ALL,
    1200              :                                                         lfirst_oid(lc));
    1201              :         }
    1202              : 
    1203           63 :         schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
    1204              :                                                         PUBLICATION_PART_ALL);
    1205           63 :         relids = list_concat_unique_oid(relids, schemarelids);
    1206              : 
    1207           63 :         InvalidatePublicationRels(relids);
    1208              :     }
    1209              : 
    1210           76 :     ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
    1211           76 :     EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
    1212              :                                      (Node *) stmt);
    1213              : 
    1214           76 :     InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
    1215           76 : }
    1216              : 
    1217              : /*
    1218              :  * Invalidate the relations.
    1219              :  */
    1220              : void
    1221         1600 : InvalidatePublicationRels(List *relids)
    1222              : {
    1223              :     /*
    1224              :      * We don't want to send too many individual messages, at some point it's
    1225              :      * cheaper to just reset whole relcache.
    1226              :      */
    1227         1600 :     if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
    1228              :     {
    1229              :         ListCell   *lc;
    1230              : 
    1231        13707 :         foreach(lc, relids)
    1232        12107 :             CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
    1233              :     }
    1234              :     else
    1235            0 :         CacheInvalidateRelcacheAll();
    1236         1600 : }
    1237              : 
    1238              : /*
    1239              :  * Add or remove table to/from publication.
    1240              :  */
    1241              : static void
    1242          617 : AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
    1243              :                        List *tables, const char *queryString,
    1244              :                        bool publish_schema)
    1245              : {
    1246          617 :     List       *rels = NIL;
    1247          617 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
    1248          617 :     Oid         pubid = pubform->oid;
    1249              : 
    1250              :     /*
    1251              :      * Nothing to do if no objects, except in SET: for that it is quite
    1252              :      * possible that user has not specified any tables in which case we need
    1253              :      * to remove all the existing tables.
    1254              :      */
    1255          617 :     if (!tables && stmt->action != AP_SetObjects)
    1256           48 :         return;
    1257              : 
    1258          569 :     rels = OpenTableList(tables);
    1259              : 
    1260          569 :     if (stmt->action == AP_AddObjects)
    1261              :     {
    1262          186 :         TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
    1263              : 
    1264          174 :         publish_schema |= is_schema_publication(pubid);
    1265              : 
    1266          174 :         CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
    1267          174 :                                    pubform->pubviaroot);
    1268              : 
    1269          166 :         PublicationAddTables(pubid, rels, false, stmt);
    1270              :     }
    1271          383 :     else if (stmt->action == AP_DropObjects)
    1272           66 :         PublicationDropTables(pubid, rels, false);
    1273              :     else                        /* AP_SetObjects */
    1274              :     {
    1275          317 :         List       *oldrelids = NIL;
    1276          317 :         List       *delrels = NIL;
    1277              :         ListCell   *oldlc;
    1278              : 
    1279          317 :         if (stmt->for_all_tables || stmt->for_all_sequences)
    1280              :         {
    1281              :             /*
    1282              :              * In FOR ALL TABLES mode, relations are tracked as exclusions
    1283              :              * (EXCEPT TABLES). Fetch the current excluded relations so they
    1284              :              * can be reconciled with the specified EXCEPT list.
    1285              :              *
    1286              :              * This applies only if the existing publication is already
    1287              :              * defined as FOR ALL TABLES; otherwise, there are no exclusion
    1288              :              * entries to process.
    1289              :              */
    1290           21 :             if (pubform->puballtables)
    1291              :             {
    1292           17 :                 oldrelids = GetExcludedPublicationTables(pubid,
    1293              :                                                          PUBLICATION_PART_ROOT);
    1294              :             }
    1295              :         }
    1296              :         else
    1297              :         {
    1298          296 :             oldrelids = GetIncludedPublicationRelations(pubid,
    1299              :                                                         PUBLICATION_PART_ROOT);
    1300              : 
    1301          296 :             TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
    1302              : 
    1303          284 :             CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
    1304          284 :                                        pubform->pubviaroot);
    1305              :         }
    1306              : 
    1307              :         /*
    1308              :          * To recreate the relation list for the publication, look for
    1309              :          * existing relations that do not need to be dropped.
    1310              :          */
    1311          571 :         foreach(oldlc, oldrelids)
    1312              :         {
    1313          282 :             Oid         oldrelid = lfirst_oid(oldlc);
    1314              :             ListCell   *newlc;
    1315              :             PublicationRelInfo *oldrel;
    1316          282 :             bool        found = false;
    1317              :             HeapTuple   rftuple;
    1318          282 :             Node       *oldrelwhereclause = NULL;
    1319          282 :             Bitmapset  *oldcolumns = NULL;
    1320              : 
    1321              :             /* look up the cache for the old relmap */
    1322          282 :             rftuple = SearchSysCache2(PUBLICATIONRELMAP,
    1323              :                                       ObjectIdGetDatum(oldrelid),
    1324              :                                       ObjectIdGetDatum(pubid));
    1325              : 
    1326              :             /*
    1327              :              * See if the existing relation currently has a WHERE clause or a
    1328              :              * column list. We need to compare those too.
    1329              :              */
    1330          282 :             if (HeapTupleIsValid(rftuple))
    1331              :             {
    1332          282 :                 bool        isnull = true;
    1333              :                 Datum       whereClauseDatum;
    1334              :                 Datum       columnListDatum;
    1335              : 
    1336              :                 /* Load the WHERE clause for this table. */
    1337          282 :                 whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
    1338              :                                                    Anum_pg_publication_rel_prqual,
    1339              :                                                    &isnull);
    1340          282 :                 if (!isnull)
    1341          139 :                     oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
    1342              : 
    1343              :                 /* Transform the int2vector column list to a bitmap. */
    1344          282 :                 columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
    1345              :                                                   Anum_pg_publication_rel_prattrs,
    1346              :                                                   &isnull);
    1347              : 
    1348          282 :                 if (!isnull)
    1349           90 :                     oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
    1350              : 
    1351          282 :                 ReleaseSysCache(rftuple);
    1352              :             }
    1353              : 
    1354          538 :             foreach(newlc, rels)
    1355              :             {
    1356              :                 PublicationRelInfo *newpubrel;
    1357              :                 Oid         newrelid;
    1358          278 :                 Bitmapset  *newcolumns = NULL;
    1359              : 
    1360          278 :                 newpubrel = (PublicationRelInfo *) lfirst(newlc);
    1361          278 :                 newrelid = RelationGetRelid(newpubrel->relation);
    1362              : 
    1363              :                 /*
    1364              :                  * Validate the column list.  If the column list or WHERE
    1365              :                  * clause changes, then the validation done here will be
    1366              :                  * duplicated inside PublicationAddTables().  The validation
    1367              :                  * is cheap enough that that seems harmless.
    1368              :                  */
    1369          278 :                 newcolumns = pub_collist_validate(newpubrel->relation,
    1370              :                                                   newpubrel->columns);
    1371              : 
    1372              :                 /*
    1373              :                  * Check if any of the new set of relations matches with the
    1374              :                  * existing relations in the publication. Additionally, if the
    1375              :                  * relation has an associated WHERE clause, check the WHERE
    1376              :                  * expressions also match. Same for the column list. Drop the
    1377              :                  * rest.
    1378              :                  */
    1379          270 :                 if (newrelid == oldrelid)
    1380              :                 {
    1381          194 :                     if (equal(oldrelwhereclause, newpubrel->whereClause) &&
    1382           56 :                         bms_equal(oldcolumns, newcolumns))
    1383              :                     {
    1384           14 :                         found = true;
    1385           14 :                         break;
    1386              :                     }
    1387              :                 }
    1388              :             }
    1389              : 
    1390              :             /*
    1391              :              * Add the non-matched relations to a list so that they can be
    1392              :              * dropped.
    1393              :              */
    1394          274 :             if (!found)
    1395              :             {
    1396          260 :                 oldrel = palloc_object(PublicationRelInfo);
    1397          260 :                 oldrel->whereClause = NULL;
    1398          260 :                 oldrel->columns = NIL;
    1399          260 :                 oldrel->except = false;
    1400          260 :                 oldrel->relation = table_open(oldrelid,
    1401              :                                               ShareUpdateExclusiveLock);
    1402          260 :                 delrels = lappend(delrels, oldrel);
    1403              :             }
    1404              :         }
    1405              : 
    1406              :         /* And drop them. */
    1407          289 :         PublicationDropTables(pubid, delrels, true);
    1408              : 
    1409              :         /*
    1410              :          * Don't bother calculating the difference for adding, we'll catch and
    1411              :          * skip existing ones when doing catalog update.
    1412              :          */
    1413          289 :         PublicationAddTables(pubid, rels, true, stmt);
    1414              : 
    1415          289 :         CloseTableList(delrels);
    1416              :     }
    1417              : 
    1418          481 :     CloseTableList(rels);
    1419              : }
    1420              : 
    1421              : /*
    1422              :  * Alter the publication schemas.
    1423              :  *
    1424              :  * Add or remove schemas to/from publication.
    1425              :  */
    1426              : static void
    1427          529 : AlterPublicationSchemas(AlterPublicationStmt *stmt,
    1428              :                         HeapTuple tup, List *schemaidlist)
    1429              : {
    1430          529 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
    1431              : 
    1432              :     /*
    1433              :      * Nothing to do if no objects, except in SET: for that it is quite
    1434              :      * possible that user has not specified any schemas in which case we need
    1435              :      * to remove all the existing schemas.
    1436              :      */
    1437          529 :     if (!schemaidlist && stmt->action != AP_SetObjects)
    1438          192 :         return;
    1439              : 
    1440              :     /*
    1441              :      * Schema lock is held until the publication is altered to prevent
    1442              :      * concurrent schema deletion.
    1443              :      */
    1444          337 :     LockSchemaList(schemaidlist);
    1445          337 :     if (stmt->action == AP_AddObjects)
    1446              :     {
    1447              :         ListCell   *lc;
    1448              :         List       *reloids;
    1449              : 
    1450           23 :         reloids = GetIncludedPublicationRelations(pubform->oid,
    1451              :                                                   PUBLICATION_PART_ROOT);
    1452              : 
    1453           32 :         foreach(lc, reloids)
    1454              :         {
    1455              :             HeapTuple   coltuple;
    1456              : 
    1457           13 :             coltuple = SearchSysCache2(PUBLICATIONRELMAP,
    1458              :                                        ObjectIdGetDatum(lfirst_oid(lc)),
    1459              :                                        ObjectIdGetDatum(pubform->oid));
    1460              : 
    1461           13 :             if (!HeapTupleIsValid(coltuple))
    1462            0 :                 continue;
    1463              : 
    1464              :             /*
    1465              :              * Disallow adding schema if column list is already part of the
    1466              :              * publication. See CheckPubRelationColumnList.
    1467              :              */
    1468           13 :             if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
    1469            4 :                 ereport(ERROR,
    1470              :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1471              :                         errmsg("cannot add schema to publication \"%s\"",
    1472              :                                stmt->pubname),
    1473              :                         errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
    1474              : 
    1475            9 :             ReleaseSysCache(coltuple);
    1476              :         }
    1477              : 
    1478           19 :         PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
    1479              :     }
    1480          314 :     else if (stmt->action == AP_DropObjects)
    1481           25 :         PublicationDropSchemas(pubform->oid, schemaidlist, false);
    1482              :     else                        /* AP_SetObjects */
    1483              :     {
    1484          289 :         List       *oldschemaids = GetPublicationSchemas(pubform->oid);
    1485          289 :         List       *delschemas = NIL;
    1486              : 
    1487              :         /* Identify which schemas should be dropped */
    1488          289 :         delschemas = list_difference_oid(oldschemaids, schemaidlist);
    1489              : 
    1490              :         /*
    1491              :          * Schema lock is held until the publication is altered to prevent
    1492              :          * concurrent schema deletion.
    1493              :          */
    1494          289 :         LockSchemaList(delschemas);
    1495              : 
    1496              :         /* And drop them */
    1497          289 :         PublicationDropSchemas(pubform->oid, delschemas, true);
    1498              : 
    1499              :         /*
    1500              :          * Don't bother calculating the difference for adding, we'll catch and
    1501              :          * skip existing ones when doing catalog update.
    1502              :          */
    1503          289 :         PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
    1504              :     }
    1505              : }
    1506              : 
    1507              : /*
    1508              :  * Check if relations and schemas can be in a given publication and throw
    1509              :  * appropriate error if not.
    1510              :  */
    1511              : static void
    1512          681 : CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
    1513              :                       List *tables, List *schemaidlist)
    1514              : {
    1515          681 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
    1516              : 
    1517          681 :     if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
    1518           67 :         schemaidlist && !superuser())
    1519            4 :         ereport(ERROR,
    1520              :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1521              :                  errmsg("must be superuser to add or set schemas")));
    1522              : 
    1523          677 :     if (stmt->for_all_tables && !superuser())
    1524            8 :         ereport(ERROR,
    1525              :                 errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1526              :                 errmsg("must be superuser to set ALL TABLES"));
    1527              : 
    1528          669 :     if (stmt->for_all_sequences && !superuser())
    1529            4 :         ereport(ERROR,
    1530              :                 errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1531              :                 errmsg("must be superuser to set ALL SEQUENCES"));
    1532              : 
    1533              :     /*
    1534              :      * Check that user is allowed to manipulate the publication tables in
    1535              :      * schema
    1536              :      */
    1537          665 :     if (schemaidlist && (pubform->puballtables || pubform->puballsequences))
    1538              :     {
    1539           12 :         if (pubform->puballtables && pubform->puballsequences)
    1540            0 :             ereport(ERROR,
    1541              :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1542              :                     errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
    1543              :                            NameStr(pubform->pubname)),
    1544              :                     errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
    1545           12 :         else if (pubform->puballtables)
    1546           12 :             ereport(ERROR,
    1547              :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1548              :                     errmsg("publication \"%s\" is defined as FOR ALL TABLES",
    1549              :                            NameStr(pubform->pubname)),
    1550              :                     errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications."));
    1551              :         else
    1552            0 :             ereport(ERROR,
    1553              :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1554              :                     errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
    1555              :                            NameStr(pubform->pubname)),
    1556              :                     errdetail("Schemas cannot be added to or dropped from FOR ALL SEQUENCES publications."));
    1557              :     }
    1558              : 
    1559              :     /* Check that user is allowed to manipulate the publication tables. */
    1560          653 :     if (tables && (pubform->puballtables || pubform->puballsequences))
    1561              :     {
    1562           12 :         if (pubform->puballtables && pubform->puballsequences)
    1563            0 :             ereport(ERROR,
    1564              :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1565              :                     errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
    1566              :                            NameStr(pubform->pubname)),
    1567              :                     errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
    1568           12 :         else if (pubform->puballtables)
    1569           12 :             ereport(ERROR,
    1570              :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1571              :                     errmsg("publication \"%s\" is defined as FOR ALL TABLES",
    1572              :                            NameStr(pubform->pubname)),
    1573              :                     errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES publications."));
    1574              :         else
    1575            0 :             ereport(ERROR,
    1576              :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1577              :                     errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
    1578              :                            NameStr(pubform->pubname)),
    1579              :                     errdetail("Tables or sequences cannot be added to or dropped from FOR ALL SEQUENCES publications."));
    1580              :     }
    1581              : 
    1582          641 :     if (stmt->for_all_tables || stmt->for_all_sequences)
    1583              :     {
    1584              :         /*
    1585              :          * If the publication already contains specific tables or schemas, we
    1586              :          * prevent switching to a ALL state.
    1587              :          */
    1588           78 :         if (is_table_publication(pubform->oid) ||
    1589           33 :             is_schema_publication(pubform->oid))
    1590              :         {
    1591           24 :             ereport(ERROR,
    1592              :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1593              :                     stmt->for_all_tables ?
    1594              :                     errmsg("publication \"%s\" does not support ALL TABLES operations", NameStr(pubform->pubname)) :
    1595              :                     errmsg("publication \"%s\" does not support ALL SEQUENCES operations", NameStr(pubform->pubname)),
    1596              :                     errdetail("This operation requires the publication to be defined as FOR ALL TABLES/SEQUENCES or to be empty."));
    1597              :         }
    1598              :     }
    1599          617 : }
    1600              : 
    1601              : /*
    1602              :  * Update FOR ALL TABLES / FOR ALL SEQUENCES flags of a publication.
    1603              :  */
    1604              : static void
    1605          517 : AlterPublicationAllFlags(AlterPublicationStmt *stmt, Relation rel,
    1606              :                          HeapTuple tup)
    1607              : {
    1608              :     Form_pg_publication pubform;
    1609          517 :     bool        nulls[Natts_pg_publication] = {0};
    1610          517 :     bool        replaces[Natts_pg_publication] = {0};
    1611          517 :     Datum       values[Natts_pg_publication] = {0};
    1612          517 :     bool        dirty = false;
    1613              : 
    1614          517 :     if (!stmt->for_all_tables && !stmt->for_all_sequences)
    1615          496 :         return;
    1616              : 
    1617           21 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1618              : 
    1619              :     /* Update FOR ALL TABLES flag if changed */
    1620           21 :     if (stmt->for_all_tables != pubform->puballtables)
    1621              :     {
    1622            8 :         values[Anum_pg_publication_puballtables - 1] =
    1623            8 :             BoolGetDatum(stmt->for_all_tables);
    1624            8 :         replaces[Anum_pg_publication_puballtables - 1] = true;
    1625            8 :         dirty = true;
    1626              :     }
    1627              : 
    1628              :     /* Update FOR ALL SEQUENCES flag if changed */
    1629           21 :     if (stmt->for_all_sequences != pubform->puballsequences)
    1630              :     {
    1631           12 :         values[Anum_pg_publication_puballsequences - 1] =
    1632           12 :             BoolGetDatum(stmt->for_all_sequences);
    1633           12 :         replaces[Anum_pg_publication_puballsequences - 1] = true;
    1634           12 :         dirty = true;
    1635              :     }
    1636              : 
    1637           21 :     if (dirty)
    1638              :     {
    1639           12 :         tup = heap_modify_tuple(tup, RelationGetDescr(rel), values,
    1640              :                                 nulls, replaces);
    1641           12 :         CatalogTupleUpdate(rel, &tup->t_self, tup);
    1642           12 :         CommandCounterIncrement();
    1643              : 
    1644              :         /* For ALL TABLES, we must invalidate all relcache entries */
    1645           12 :         if (replaces[Anum_pg_publication_puballtables - 1])
    1646            8 :             CacheInvalidateRelcacheAll();
    1647              :     }
    1648              : }
    1649              : 
    1650              : /*
    1651              :  * Alter the existing publication.
    1652              :  *
    1653              :  * This is dispatcher function for AlterPublicationOptions,
    1654              :  * AlterPublicationSchemas and AlterPublicationTables.
    1655              :  */
    1656              : void
    1657          777 : AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
    1658              : {
    1659              :     Relation    rel;
    1660              :     HeapTuple   tup;
    1661              :     Form_pg_publication pubform;
    1662              : 
    1663          777 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    1664              : 
    1665          777 :     tup = SearchSysCacheCopy1(PUBLICATIONNAME,
    1666              :                               CStringGetDatum(stmt->pubname));
    1667              : 
    1668          777 :     if (!HeapTupleIsValid(tup))
    1669            0 :         ereport(ERROR,
    1670              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    1671              :                  errmsg("publication \"%s\" does not exist",
    1672              :                         stmt->pubname)));
    1673              : 
    1674          777 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1675              : 
    1676              :     /* must be owner */
    1677          777 :     if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
    1678            0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
    1679            0 :                        stmt->pubname);
    1680              : 
    1681          777 :     if (stmt->options)
    1682           84 :         AlterPublicationOptions(pstate, stmt, rel, tup);
    1683              :     else
    1684              :     {
    1685          693 :         List       *relations = NIL;
    1686          693 :         List       *exceptrelations = NIL;
    1687          693 :         List       *schemaidlist = NIL;
    1688          693 :         Oid         pubid = pubform->oid;
    1689              : 
    1690          693 :         ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
    1691              :                                    &exceptrelations, &schemaidlist);
    1692              : 
    1693          681 :         CheckAlterPublication(stmt, tup, relations, schemaidlist);
    1694              : 
    1695          617 :         heap_freetuple(tup);
    1696              : 
    1697              :         /* Lock the publication so nobody else can do anything with it. */
    1698          617 :         LockDatabaseObject(PublicationRelationId, pubid, 0,
    1699              :                            AccessExclusiveLock);
    1700              : 
    1701              :         /*
    1702              :          * It is possible that by the time we acquire the lock on publication,
    1703              :          * concurrent DDL has removed it. We can test this by checking the
    1704              :          * existence of publication. We get the tuple again to avoid the risk
    1705              :          * of any publication option getting changed.
    1706              :          */
    1707          617 :         tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    1708          617 :         if (!HeapTupleIsValid(tup))
    1709            0 :             ereport(ERROR,
    1710              :                     errcode(ERRCODE_UNDEFINED_OBJECT),
    1711              :                     errmsg("publication \"%s\" does not exist",
    1712              :                            stmt->pubname));
    1713              : 
    1714          617 :         relations = list_concat(relations, exceptrelations);
    1715          617 :         AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
    1716              :                                schemaidlist != NIL);
    1717          529 :         AlterPublicationSchemas(stmt, tup, schemaidlist);
    1718          517 :         AlterPublicationAllFlags(stmt, rel, tup);
    1719              :     }
    1720              : 
    1721              :     /* Cleanup. */
    1722          593 :     heap_freetuple(tup);
    1723          593 :     table_close(rel, RowExclusiveLock);
    1724          593 : }
    1725              : 
    1726              : /*
    1727              :  * Remove relation from publication by mapping OID.
    1728              :  */
    1729              : void
    1730          589 : RemovePublicationRelById(Oid proid)
    1731              : {
    1732              :     Relation    rel;
    1733              :     HeapTuple   tup;
    1734              :     Form_pg_publication_rel pubrel;
    1735          589 :     List       *relids = NIL;
    1736              : 
    1737          589 :     rel = table_open(PublicationRelRelationId, RowExclusiveLock);
    1738              : 
    1739          589 :     tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
    1740              : 
    1741          589 :     if (!HeapTupleIsValid(tup))
    1742            0 :         elog(ERROR, "cache lookup failed for publication table %u",
    1743              :              proid);
    1744              : 
    1745          589 :     pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
    1746              : 
    1747              :     /*
    1748              :      * Invalidate relcache so that publication info is rebuilt.
    1749              :      *
    1750              :      * For the partitioned tables, we must invalidate all partitions contained
    1751              :      * in the respective partition hierarchies, not just the one explicitly
    1752              :      * mentioned in the publication. This is required because we implicitly
    1753              :      * publish the child tables when the parent table is published.
    1754              :      */
    1755          589 :     relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
    1756              :                                             pubrel->prrelid);
    1757              : 
    1758          589 :     InvalidatePublicationRels(relids);
    1759              : 
    1760          589 :     CatalogTupleDelete(rel, &tup->t_self);
    1761              : 
    1762          589 :     ReleaseSysCache(tup);
    1763              : 
    1764          589 :     table_close(rel, RowExclusiveLock);
    1765          589 : }
    1766              : 
    1767              : /*
    1768              :  * Remove the publication by mapping OID.
    1769              :  */
    1770              : void
    1771          352 : RemovePublicationById(Oid pubid)
    1772              : {
    1773              :     Relation    rel;
    1774              :     HeapTuple   tup;
    1775              :     Form_pg_publication pubform;
    1776              : 
    1777          352 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    1778              : 
    1779          352 :     tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    1780          352 :     if (!HeapTupleIsValid(tup))
    1781            0 :         elog(ERROR, "cache lookup failed for publication %u", pubid);
    1782              : 
    1783          352 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    1784              : 
    1785              :     /* Invalidate relcache so that publication info is rebuilt. */
    1786          352 :     if (pubform->puballtables)
    1787           75 :         CacheInvalidateRelcacheAll();
    1788              : 
    1789          352 :     CatalogTupleDelete(rel, &tup->t_self);
    1790              : 
    1791          352 :     ReleaseSysCache(tup);
    1792              : 
    1793          352 :     table_close(rel, RowExclusiveLock);
    1794          352 : }
    1795              : 
    1796              : /*
    1797              :  * Remove schema from publication by mapping OID.
    1798              :  */
    1799              : void
    1800          127 : RemovePublicationSchemaById(Oid psoid)
    1801              : {
    1802              :     Relation    rel;
    1803              :     HeapTuple   tup;
    1804          127 :     List       *schemaRels = NIL;
    1805              :     Form_pg_publication_namespace pubsch;
    1806              : 
    1807          127 :     rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
    1808              : 
    1809          127 :     tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
    1810              : 
    1811          127 :     if (!HeapTupleIsValid(tup))
    1812            0 :         elog(ERROR, "cache lookup failed for publication schema %u", psoid);
    1813              : 
    1814          127 :     pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
    1815              : 
    1816              :     /*
    1817              :      * Invalidate relcache so that publication info is rebuilt. See
    1818              :      * RemovePublicationRelById for why we need to consider all the
    1819              :      * partitions.
    1820              :      */
    1821          127 :     schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
    1822              :                                                PUBLICATION_PART_ALL);
    1823          127 :     InvalidatePublicationRels(schemaRels);
    1824              : 
    1825          127 :     CatalogTupleDelete(rel, &tup->t_self);
    1826              : 
    1827          127 :     ReleaseSysCache(tup);
    1828              : 
    1829          127 :     table_close(rel, RowExclusiveLock);
    1830          127 : }
    1831              : 
    1832              : /*
    1833              :  * Open relations specified by a PublicationTable list.
    1834              :  * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
    1835              :  * add them to a publication.
    1836              :  */
    1837              : static List *
    1838          889 : OpenTableList(List *tables)
    1839              : {
    1840          889 :     List       *relids = NIL;
    1841          889 :     List       *rels = NIL;
    1842              :     ListCell   *lc;
    1843          889 :     List       *relids_with_rf = NIL;
    1844          889 :     List       *relids_with_collist = NIL;
    1845              : 
    1846              :     /*
    1847              :      * Open, share-lock, and check all the explicitly-specified relations
    1848              :      */
    1849         1805 :     foreach(lc, tables)
    1850              :     {
    1851          936 :         PublicationTable *t = lfirst_node(PublicationTable, lc);
    1852          936 :         bool        recurse = t->relation->inh;
    1853              :         Relation    rel;
    1854              :         Oid         myrelid;
    1855              :         PublicationRelInfo *pub_rel;
    1856              : 
    1857              :         /* Allow query cancel in case this takes a long time */
    1858          936 :         CHECK_FOR_INTERRUPTS();
    1859              : 
    1860          936 :         rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
    1861          932 :         myrelid = RelationGetRelid(rel);
    1862              : 
    1863              :         /*
    1864              :          * Filter out duplicates if user specifies "foo, foo".
    1865              :          *
    1866              :          * Note that this algorithm is known to not be very efficient (O(N^2))
    1867              :          * but given that it only works on list of tables given to us by user
    1868              :          * it's deemed acceptable.
    1869              :          */
    1870          932 :         if (list_member_oid(relids, myrelid))
    1871              :         {
    1872              :             /* Disallow duplicate tables if there are any with row filters. */
    1873           16 :             if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
    1874            8 :                 ereport(ERROR,
    1875              :                         (errcode(ERRCODE_DUPLICATE_OBJECT),
    1876              :                          errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
    1877              :                                 RelationGetRelationName(rel))));
    1878              : 
    1879              :             /* Disallow duplicate tables if there are any with column lists. */
    1880            8 :             if (t->columns || list_member_oid(relids_with_collist, myrelid))
    1881            8 :                 ereport(ERROR,
    1882              :                         (errcode(ERRCODE_DUPLICATE_OBJECT),
    1883              :                          errmsg("conflicting or redundant column lists for table \"%s\"",
    1884              :                                 RelationGetRelationName(rel))));
    1885              : 
    1886            0 :             table_close(rel, ShareUpdateExclusiveLock);
    1887            0 :             continue;
    1888              :         }
    1889              : 
    1890          916 :         pub_rel = palloc_object(PublicationRelInfo);
    1891          916 :         pub_rel->relation = rel;
    1892          916 :         pub_rel->whereClause = t->whereClause;
    1893          916 :         pub_rel->columns = t->columns;
    1894          916 :         pub_rel->except = t->except;
    1895          916 :         rels = lappend(rels, pub_rel);
    1896          916 :         relids = lappend_oid(relids, myrelid);
    1897              : 
    1898          916 :         if (t->whereClause)
    1899          260 :             relids_with_rf = lappend_oid(relids_with_rf, myrelid);
    1900              : 
    1901          916 :         if (t->columns)
    1902          260 :             relids_with_collist = lappend_oid(relids_with_collist, myrelid);
    1903              : 
    1904              :         /*
    1905              :          * Add children of this rel, if requested, so that they too are added
    1906              :          * to the publication.  A partitioned table can't have any inheritance
    1907              :          * children other than its partitions, which need not be explicitly
    1908              :          * added to the publication.
    1909              :          */
    1910          916 :         if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
    1911              :         {
    1912              :             List       *children;
    1913              :             ListCell   *child;
    1914              : 
    1915          764 :             children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
    1916              :                                            NULL);
    1917              : 
    1918         1543 :             foreach(child, children)
    1919              :             {
    1920          779 :                 Oid         childrelid = lfirst_oid(child);
    1921              : 
    1922              :                 /* Allow query cancel in case this takes a long time */
    1923          779 :                 CHECK_FOR_INTERRUPTS();
    1924              : 
    1925              :                 /*
    1926              :                  * Skip duplicates if user specified both parent and child
    1927              :                  * tables.
    1928              :                  */
    1929          779 :                 if (list_member_oid(relids, childrelid))
    1930              :                 {
    1931              :                     /*
    1932              :                      * We don't allow to specify row filter for both parent
    1933              :                      * and child table at the same time as it is not very
    1934              :                      * clear which one should be given preference.
    1935              :                      */
    1936          764 :                     if (childrelid != myrelid &&
    1937            0 :                         (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
    1938            0 :                         ereport(ERROR,
    1939              :                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    1940              :                                  errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
    1941              :                                         RelationGetRelationName(rel))));
    1942              : 
    1943              :                     /*
    1944              :                      * We don't allow to specify column list for both parent
    1945              :                      * and child table at the same time as it is not very
    1946              :                      * clear which one should be given preference.
    1947              :                      */
    1948          764 :                     if (childrelid != myrelid &&
    1949            0 :                         (t->columns || list_member_oid(relids_with_collist, childrelid)))
    1950            0 :                         ereport(ERROR,
    1951              :                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    1952              :                                  errmsg("conflicting or redundant column lists for table \"%s\"",
    1953              :                                         RelationGetRelationName(rel))));
    1954              : 
    1955          764 :                     continue;
    1956              :                 }
    1957              : 
    1958              :                 /* find_all_inheritors already got lock */
    1959           15 :                 rel = table_open(childrelid, NoLock);
    1960           15 :                 pub_rel = palloc_object(PublicationRelInfo);
    1961           15 :                 pub_rel->relation = rel;
    1962              :                 /* child inherits WHERE clause from parent */
    1963           15 :                 pub_rel->whereClause = t->whereClause;
    1964              : 
    1965              :                 /* child inherits column list from parent */
    1966           15 :                 pub_rel->columns = t->columns;
    1967           15 :                 pub_rel->except = t->except;
    1968           15 :                 rels = lappend(rels, pub_rel);
    1969           15 :                 relids = lappend_oid(relids, childrelid);
    1970              : 
    1971           15 :                 if (t->whereClause)
    1972            1 :                     relids_with_rf = lappend_oid(relids_with_rf, childrelid);
    1973              : 
    1974           15 :                 if (t->columns)
    1975            0 :                     relids_with_collist = lappend_oid(relids_with_collist, childrelid);
    1976              :             }
    1977              :         }
    1978              :     }
    1979              : 
    1980          869 :     list_free(relids);
    1981          869 :     list_free(relids_with_rf);
    1982              : 
    1983          869 :     return rels;
    1984              : }
    1985              : 
    1986              : /*
    1987              :  * Close all relations in the list.
    1988              :  */
    1989              : static void
    1990         1029 : CloseTableList(List *rels)
    1991              : {
    1992              :     ListCell   *lc;
    1993              : 
    1994         2075 :     foreach(lc, rels)
    1995              :     {
    1996              :         PublicationRelInfo *pub_rel;
    1997              : 
    1998         1046 :         pub_rel = (PublicationRelInfo *) lfirst(lc);
    1999         1046 :         table_close(pub_rel->relation, NoLock);
    2000              :     }
    2001              : 
    2002         1029 :     list_free_deep(rels);
    2003         1029 : }
    2004              : 
    2005              : /*
    2006              :  * Lock the schemas specified in the schema list in AccessShareLock mode in
    2007              :  * order to prevent concurrent schema deletion.
    2008              :  */
    2009              : static void
    2010          726 : LockSchemaList(List *schemalist)
    2011              : {
    2012              :     ListCell   *lc;
    2013              : 
    2014          938 :     foreach(lc, schemalist)
    2015              :     {
    2016          212 :         Oid         schemaid = lfirst_oid(lc);
    2017              : 
    2018              :         /* Allow query cancel in case this takes a long time */
    2019          212 :         CHECK_FOR_INTERRUPTS();
    2020          212 :         LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
    2021              : 
    2022              :         /*
    2023              :          * It is possible that by the time we acquire the lock on schema,
    2024              :          * concurrent DDL has removed it. We can test this by checking the
    2025              :          * existence of schema.
    2026              :          */
    2027          212 :         if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
    2028            0 :             ereport(ERROR,
    2029              :                     errcode(ERRCODE_UNDEFINED_SCHEMA),
    2030              :                     errmsg("schema with OID %u does not exist", schemaid));
    2031              :     }
    2032          726 : }
    2033              : 
    2034              : /*
    2035              :  * Add listed tables to the publication.
    2036              :  */
    2037              : static void
    2038          735 : PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
    2039              :                      AlterPublicationStmt *stmt)
    2040              : {
    2041              :     ListCell   *lc;
    2042              : 
    2043         1463 :     foreach(lc, rels)
    2044              :     {
    2045          777 :         PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
    2046          777 :         Relation    rel = pub_rel->relation;
    2047              :         ObjectAddress obj;
    2048              : 
    2049              :         /* Must be owner of the table or superuser. */
    2050          777 :         if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
    2051            4 :             aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
    2052            4 :                            RelationGetRelationName(rel));
    2053              : 
    2054          773 :         obj = publication_add_relation(pubid, pub_rel, if_not_exists, stmt);
    2055          728 :         if (stmt)
    2056              :         {
    2057          407 :             EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
    2058              :                                              (Node *) stmt);
    2059              : 
    2060          407 :             InvokeObjectPostCreateHook(PublicationRelRelationId,
    2061              :                                        obj.objectId, 0);
    2062              :         }
    2063              :     }
    2064          686 : }
    2065              : 
    2066              : /*
    2067              :  * Remove listed tables from the publication.
    2068              :  */
    2069              : static void
    2070          355 : PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
    2071              : {
    2072              :     ObjectAddress obj;
    2073              :     ListCell   *lc;
    2074              :     Oid         prid;
    2075              : 
    2076          673 :     foreach(lc, rels)
    2077              :     {
    2078          330 :         PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
    2079          330 :         Relation    rel = pubrel->relation;
    2080          330 :         Oid         relid = RelationGetRelid(rel);
    2081              : 
    2082          330 :         if (pubrel->columns)
    2083            0 :             ereport(ERROR,
    2084              :                     errcode(ERRCODE_SYNTAX_ERROR),
    2085              :                     errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
    2086              : 
    2087          330 :         prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
    2088              :                                ObjectIdGetDatum(relid),
    2089              :                                ObjectIdGetDatum(pubid));
    2090          330 :         if (!OidIsValid(prid))
    2091              :         {
    2092            8 :             if (missing_ok)
    2093            0 :                 continue;
    2094              : 
    2095            8 :             ereport(ERROR,
    2096              :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    2097              :                      errmsg("relation \"%s\" is not part of the publication",
    2098              :                             RelationGetRelationName(rel))));
    2099              :         }
    2100              : 
    2101          322 :         if (pubrel->whereClause)
    2102            4 :             ereport(ERROR,
    2103              :                     (errcode(ERRCODE_SYNTAX_ERROR),
    2104              :                      errmsg("cannot use a WHERE clause when removing a table from a publication")));
    2105              : 
    2106          318 :         ObjectAddressSet(obj, PublicationRelRelationId, prid);
    2107          318 :         performDeletion(&obj, DROP_CASCADE, 0);
    2108              :     }
    2109          343 : }
    2110              : 
    2111              : /*
    2112              :  * Add listed schemas to the publication.
    2113              :  */
    2114              : static void
    2115          408 : PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
    2116              :                       AlterPublicationStmt *stmt)
    2117              : {
    2118              :     ListCell   *lc;
    2119              : 
    2120          571 :     foreach(lc, schemas)
    2121              :     {
    2122          171 :         Oid         schemaid = lfirst_oid(lc);
    2123              :         ObjectAddress obj;
    2124              : 
    2125          171 :         obj = publication_add_schema(pubid, schemaid, if_not_exists);
    2126          163 :         if (stmt)
    2127              :         {
    2128           43 :             EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
    2129              :                                              (Node *) stmt);
    2130              : 
    2131           43 :             InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
    2132              :                                        obj.objectId, 0);
    2133              :         }
    2134              :     }
    2135          400 : }
    2136              : 
    2137              : /*
    2138              :  * Remove listed schemas from the publication.
    2139              :  */
    2140              : static void
    2141          314 : PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
    2142              : {
    2143              :     ObjectAddress obj;
    2144              :     ListCell   *lc;
    2145              :     Oid         psid;
    2146              : 
    2147          347 :     foreach(lc, schemas)
    2148              :     {
    2149           37 :         Oid         schemaid = lfirst_oid(lc);
    2150              : 
    2151           37 :         psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
    2152              :                                Anum_pg_publication_namespace_oid,
    2153              :                                ObjectIdGetDatum(schemaid),
    2154              :                                ObjectIdGetDatum(pubid));
    2155           37 :         if (!OidIsValid(psid))
    2156              :         {
    2157            4 :             if (missing_ok)
    2158            0 :                 continue;
    2159              : 
    2160            4 :             ereport(ERROR,
    2161              :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
    2162              :                      errmsg("tables from schema \"%s\" are not part of the publication",
    2163              :                             get_namespace_name(schemaid))));
    2164              :         }
    2165              : 
    2166           33 :         ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
    2167           33 :         performDeletion(&obj, DROP_CASCADE, 0);
    2168              :     }
    2169          310 : }
    2170              : 
    2171              : /*
    2172              :  * Internal workhorse for changing a publication owner
    2173              :  */
    2174              : static void
    2175           26 : AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
    2176              : {
    2177              :     Form_pg_publication form;
    2178              : 
    2179           26 :     form = (Form_pg_publication) GETSTRUCT(tup);
    2180              : 
    2181           26 :     if (form->pubowner == newOwnerId)
    2182            6 :         return;
    2183              : 
    2184           20 :     if (!superuser())
    2185              :     {
    2186              :         AclResult   aclresult;
    2187              : 
    2188              :         /* Must be owner */
    2189            8 :         if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
    2190            0 :             aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
    2191            0 :                            NameStr(form->pubname));
    2192              : 
    2193              :         /* Must be able to become new owner */
    2194            8 :         check_can_set_role(GetUserId(), newOwnerId);
    2195              : 
    2196              :         /* New owner must have CREATE privilege on database */
    2197            8 :         aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
    2198            8 :         if (aclresult != ACLCHECK_OK)
    2199            0 :             aclcheck_error(aclresult, OBJECT_DATABASE,
    2200            0 :                            get_database_name(MyDatabaseId));
    2201              : 
    2202            8 :         if (!superuser_arg(newOwnerId))
    2203              :         {
    2204            8 :             if (form->puballtables || form->puballsequences ||
    2205            4 :                 is_schema_publication(form->oid))
    2206            4 :                 ereport(ERROR,
    2207              :                         errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    2208              :                         errmsg("permission denied to change owner of publication \"%s\"",
    2209              :                                NameStr(form->pubname)),
    2210              :                         errhint("The owner of a FOR ALL TABLES or ALL SEQUENCES or TABLES IN SCHEMA publication must be a superuser."));
    2211              :         }
    2212              :     }
    2213              : 
    2214           16 :     form->pubowner = newOwnerId;
    2215           16 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
    2216              : 
    2217              :     /* Update owner dependency reference */
    2218           16 :     changeDependencyOnOwner(PublicationRelationId,
    2219              :                             form->oid,
    2220              :                             newOwnerId);
    2221              : 
    2222           16 :     InvokeObjectPostAlterHook(PublicationRelationId,
    2223              :                               form->oid, 0);
    2224              : }
    2225              : 
    2226              : /*
    2227              :  * Change publication owner -- by name
    2228              :  */
    2229              : ObjectAddress
    2230           26 : AlterPublicationOwner(const char *name, Oid newOwnerId)
    2231              : {
    2232              :     Oid         pubid;
    2233              :     HeapTuple   tup;
    2234              :     Relation    rel;
    2235              :     ObjectAddress address;
    2236              :     Form_pg_publication pubform;
    2237              : 
    2238           26 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    2239              : 
    2240           26 :     tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
    2241              : 
    2242           26 :     if (!HeapTupleIsValid(tup))
    2243            0 :         ereport(ERROR,
    2244              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2245              :                  errmsg("publication \"%s\" does not exist", name)));
    2246              : 
    2247           26 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
    2248           26 :     pubid = pubform->oid;
    2249              : 
    2250           26 :     AlterPublicationOwner_internal(rel, tup, newOwnerId);
    2251              : 
    2252           22 :     ObjectAddressSet(address, PublicationRelationId, pubid);
    2253              : 
    2254           22 :     heap_freetuple(tup);
    2255              : 
    2256           22 :     table_close(rel, RowExclusiveLock);
    2257              : 
    2258           22 :     return address;
    2259              : }
    2260              : 
    2261              : /*
    2262              :  * Change publication owner -- by OID
    2263              :  */
    2264              : void
    2265            0 : AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId)
    2266              : {
    2267              :     HeapTuple   tup;
    2268              :     Relation    rel;
    2269              : 
    2270            0 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
    2271              : 
    2272            0 :     tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
    2273              : 
    2274            0 :     if (!HeapTupleIsValid(tup))
    2275            0 :         ereport(ERROR,
    2276              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2277              :                  errmsg("publication with OID %u does not exist", pubid)));
    2278              : 
    2279            0 :     AlterPublicationOwner_internal(rel, tup, newOwnerId);
    2280              : 
    2281            0 :     heap_freetuple(tup);
    2282              : 
    2283            0 :     table_close(rel, RowExclusiveLock);
    2284            0 : }
    2285              : 
    2286              : /*
    2287              :  * Extract the publish_generated_columns option value from a DefElem. "stored"
    2288              :  * and "none" values are accepted.
    2289              :  */
    2290              : static char
    2291           49 : defGetGeneratedColsOption(DefElem *def)
    2292              : {
    2293           49 :     char       *sval = "";
    2294              : 
    2295              :     /*
    2296              :      * A parameter value is required.
    2297              :      */
    2298           49 :     if (def->arg)
    2299              :     {
    2300           45 :         sval = defGetString(def);
    2301              : 
    2302           45 :         if (pg_strcasecmp(sval, "none") == 0)
    2303           14 :             return PUBLISH_GENCOLS_NONE;
    2304           31 :         if (pg_strcasecmp(sval, "stored") == 0)
    2305           27 :             return PUBLISH_GENCOLS_STORED;
    2306              :     }
    2307              : 
    2308            8 :     ereport(ERROR,
    2309              :             errcode(ERRCODE_SYNTAX_ERROR),
    2310              :             errmsg("invalid value for publication parameter \"%s\": \"%s\"", def->defname, sval),
    2311              :             errdetail("Valid values are \"%s\" and \"%s\".", "none", "stored"));
    2312              : 
    2313              :     return PUBLISH_GENCOLS_NONE;    /* keep compiler quiet */
    2314              : }
        

Generated by: LCOV version 2.0-1